Bases: BaseReader
天气阅读器。
使用OpenWeatherMap的免费API读取任何位置的天气预报和当前天气。
查看'https://openweathermap.org/appid'以了解如何生成免费的OpenWeatherMap API,它是免费的。
Parameters:
Name |
Type |
Description |
Default |
token |
str
|
|
required
|
Source code in llama_index/readers/weather/base.py
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 | class WeatherReader(BaseReader):
"""天气阅读器。
使用OpenWeatherMap的免费API读取任何位置的天气预报和当前天气。
查看'https://openweathermap.org/appid'以了解如何生成免费的OpenWeatherMap API,它是免费的。
Args:
token (str): 从OWM API获取的bearer_token。
"""
def __init__(
self,
token: str,
) -> None:
"""使用参数进行初始化。"""
super().__init__()
self.token = token
def load_data(
self,
places: List[str],
) -> List[Document]:
"""为给定位置加载天气数据。
OWM的One Call API为任何地理坐标提供以下天气数据:
- 当前天气
- 48小时的逐小时预报
- 7天的每日预报。
Args:
places(List[str])- 您想获取天气数据的地点。
"""
try:
import pyowm
except ImportError:
raise ImportError("install pyowm using `pip install pyowm`")
owm = pyowm.OWM(api_key=self.token)
mgr = owm.weather_manager()
reg = owm.city_id_registry()
results = []
for place in places:
info_dict = {}
extra_info = {}
list_of_locations = reg.locations_for(city_name=place)
try:
city = list_of_locations[0]
except ValueError:
raise ValueError(
f"Unable to find {place}, try checking the spelling and try again"
)
lat = city.lat
lon = city.lon
res = mgr.one_call(lat=lat, lon=lon)
extra_info["latitude"] = lat
extra_info["longitude"] = lon
extra_info["timezone"] = res.timezone
info_dict["location"] = place
info_dict["current weather"] = res.current.to_dict()
if res.forecast_daily:
info_dict["daily forecast"] = [i.to_dict() for i in res.forecast_daily]
if res.forecast_hourly:
info_dict["hourly forecast"] = [
i.to_dict() for i in res.forecast_hourly
]
if res.forecast_minutely:
info_dict["minutely forecast"] = [
i.to_dict() for i in res.forecast_minutely
]
if res.national_weather_alerts:
info_dict["national weather alerts"] = [
i.to_dict() for i in res.national_weather_alerts
]
results.append(Document(text=str(info_dict), extra_info=extra_info))
return results
|
load_data
load_data(places: List[str]) -> List[Document]
为给定位置加载天气数据。
OWM的One Call API为任何地理坐标提供以下天气数据:
- 当前天气
- 48小时的逐小时预报
- 7天的每日预报。
Source code in llama_index/readers/weather/base.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 | def load_data(
self,
places: List[str],
) -> List[Document]:
"""为给定位置加载天气数据。
OWM的One Call API为任何地理坐标提供以下天气数据:
- 当前天气
- 48小时的逐小时预报
- 7天的每日预报。
Args:
places(List[str])- 您想获取天气数据的地点。
"""
try:
import pyowm
except ImportError:
raise ImportError("install pyowm using `pip install pyowm`")
owm = pyowm.OWM(api_key=self.token)
mgr = owm.weather_manager()
reg = owm.city_id_registry()
results = []
for place in places:
info_dict = {}
extra_info = {}
list_of_locations = reg.locations_for(city_name=place)
try:
city = list_of_locations[0]
except ValueError:
raise ValueError(
f"Unable to find {place}, try checking the spelling and try again"
)
lat = city.lat
lon = city.lon
res = mgr.one_call(lat=lat, lon=lon)
extra_info["latitude"] = lat
extra_info["longitude"] = lon
extra_info["timezone"] = res.timezone
info_dict["location"] = place
info_dict["current weather"] = res.current.to_dict()
if res.forecast_daily:
info_dict["daily forecast"] = [i.to_dict() for i in res.forecast_daily]
if res.forecast_hourly:
info_dict["hourly forecast"] = [
i.to_dict() for i in res.forecast_hourly
]
if res.forecast_minutely:
info_dict["minutely forecast"] = [
i.to_dict() for i in res.forecast_minutely
]
if res.national_weather_alerts:
info_dict["national weather alerts"] = [
i.to_dict() for i in res.national_weather_alerts
]
results.append(Document(text=str(info_dict), extra_info=extra_info))
return results
|