Skip to content

Weather

WeatherReader #

Bases: BaseReader

天气阅读器。

使用OpenWeatherMap的免费API读取任何位置的天气预报和当前天气。

查看'https://openweathermap.org/appid'以了解如何生成免费的OpenWeatherMap API,它是免费的。

Parameters:

Name Type Description Default
token str

从OWM API获取的bearer_token。

required
Source code in llama_index/readers/weather/base.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class WeatherReader(BaseReader):
    """天气阅读器。

    使用OpenWeatherMap的免费API读取任何位置的天气预报和当前天气。

    查看'https://openweathermap.org/appid'以了解如何生成免费的OpenWeatherMap API,它是免费的。

    Args:
        token (str): 从OWM API获取的bearer_token。
"""

    def __init__(
        self,
        token: str,
    ) -> None:
        """使用参数进行初始化。"""
        super().__init__()
        self.token = token

    def load_data(
        self,
        places: List[str],
    ) -> List[Document]:
        """为给定位置加载天气数据。
OWM的One Call API为任何地理坐标提供以下天气数据:
- 当前天气
- 48小时的逐小时预报
- 7天的每日预报。

Args:
    places(List[str])- 您想获取天气数据的地点。
"""
        try:
            import pyowm
        except ImportError:
            raise ImportError("install pyowm using `pip install pyowm`")

        owm = pyowm.OWM(api_key=self.token)
        mgr = owm.weather_manager()

        reg = owm.city_id_registry()

        results = []
        for place in places:
            info_dict = {}
            extra_info = {}
            list_of_locations = reg.locations_for(city_name=place)

            try:
                city = list_of_locations[0]
            except ValueError:
                raise ValueError(
                    f"Unable to find {place}, try checking the spelling and try again"
                )
            lat = city.lat
            lon = city.lon

            res = mgr.one_call(lat=lat, lon=lon)

            extra_info["latitude"] = lat
            extra_info["longitude"] = lon
            extra_info["timezone"] = res.timezone
            info_dict["location"] = place
            info_dict["current weather"] = res.current.to_dict()
            if res.forecast_daily:
                info_dict["daily forecast"] = [i.to_dict() for i in res.forecast_daily]
            if res.forecast_hourly:
                info_dict["hourly forecast"] = [
                    i.to_dict() for i in res.forecast_hourly
                ]
            if res.forecast_minutely:
                info_dict["minutely forecast"] = [
                    i.to_dict() for i in res.forecast_minutely
                ]
            if res.national_weather_alerts:
                info_dict["national weather alerts"] = [
                    i.to_dict() for i in res.national_weather_alerts
                ]

            results.append(Document(text=str(info_dict), extra_info=extra_info))

        return results

load_data #

load_data(places: List[str]) -> List[Document]

为给定位置加载天气数据。 OWM的One Call API为任何地理坐标提供以下天气数据: - 当前天气 - 48小时的逐小时预报 - 7天的每日预报。

Source code in llama_index/readers/weather/base.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
    def load_data(
        self,
        places: List[str],
    ) -> List[Document]:
        """为给定位置加载天气数据。
OWM的One Call API为任何地理坐标提供以下天气数据:
- 当前天气
- 48小时的逐小时预报
- 7天的每日预报。

Args:
    places(List[str])- 您想获取天气数据的地点。
"""
        try:
            import pyowm
        except ImportError:
            raise ImportError("install pyowm using `pip install pyowm`")

        owm = pyowm.OWM(api_key=self.token)
        mgr = owm.weather_manager()

        reg = owm.city_id_registry()

        results = []
        for place in places:
            info_dict = {}
            extra_info = {}
            list_of_locations = reg.locations_for(city_name=place)

            try:
                city = list_of_locations[0]
            except ValueError:
                raise ValueError(
                    f"Unable to find {place}, try checking the spelling and try again"
                )
            lat = city.lat
            lon = city.lon

            res = mgr.one_call(lat=lat, lon=lon)

            extra_info["latitude"] = lat
            extra_info["longitude"] = lon
            extra_info["timezone"] = res.timezone
            info_dict["location"] = place
            info_dict["current weather"] = res.current.to_dict()
            if res.forecast_daily:
                info_dict["daily forecast"] = [i.to_dict() for i in res.forecast_daily]
            if res.forecast_hourly:
                info_dict["hourly forecast"] = [
                    i.to_dict() for i in res.forecast_hourly
                ]
            if res.forecast_minutely:
                info_dict["minutely forecast"] = [
                    i.to_dict() for i in res.forecast_minutely
                ]
            if res.national_weather_alerts:
                info_dict["national weather alerts"] = [
                    i.to_dict() for i in res.national_weather_alerts
                ]

            results.append(Document(text=str(info_dict), extra_info=extra_info))

        return results