from typing import TYPE_CHECKING, Any, Callable, Dict, Optional
from langchain_core.documents import Document
from langchain_core.pydantic_v1 import BaseModel, root_validator
from langchain_core.utils import get_from_dict_or_env
if TYPE_CHECKING:
from langchain_community.document_loaders import ApifyDatasetLoader
[docs]class ApifyWrapper(BaseModel):
"""封装了Apify。
要使用,应安装``apify-client`` python包,
并设置环境变量``APIFY_API_TOKEN``为您的API密钥,或将
`apify_api_token`作为构造函数的一个命名参数传递。"""
apify_client: Any
apify_client_async: Any
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""验证环境。
验证是否设置了Apify API令牌,并且apify-client Python包存在于当前环境中。
"""
apify_api_token = get_from_dict_or_env(
values, "apify_api_token", "APIFY_API_TOKEN"
)
try:
from apify_client import ApifyClient, ApifyClientAsync
client = ApifyClient(apify_api_token)
if httpx_client := getattr(client.http_client, "httpx_client"):
httpx_client.headers["user-agent"] += "; Origin/langchain"
async_client = ApifyClientAsync(apify_api_token)
if httpx_async_client := getattr(
async_client.http_client, "httpx_async_client"
):
httpx_async_client.headers["user-agent"] += "; Origin/langchain"
values["apify_client"] = client
values["apify_client_async"] = async_client
except ImportError:
raise ImportError(
"Could not import apify-client Python package. "
"Please install it with `pip install apify-client`."
)
return values
[docs] def call_actor(
self,
actor_id: str,
run_input: Dict,
dataset_mapping_function: Callable[[Dict], Document],
*,
build: Optional[str] = None,
memory_mbytes: Optional[int] = None,
timeout_secs: Optional[int] = None,
) -> "ApifyDatasetLoader":
"""在Apify平台上运行一个Actor,并等待结果准备就绪。
参数:
actor_id(str):Apify平台上Actor的ID或名称。
run_input(Dict):您要运行的Actor的输入对象。
dataset_mapping_function(Callable):一个函数,接受一个字典(Apify数据集项)并将其转换为Document类的实例。
build(str,可选):可选指定要运行的actor构建。可以是构建标签或构建编号。
memory_mbytes(int,可选):运行的可选内存限制,以兆字节为单位。
timeout_secs(int,可选):运行的可选超时时间,以秒为单位。
返回:
ApifyDatasetLoader:一个加载器,将从Actor运行的默认数据集中获取记录。
"""
from langchain_community.document_loaders import ApifyDatasetLoader
actor_call = self.apify_client.actor(actor_id).call(
run_input=run_input,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=timeout_secs,
)
return ApifyDatasetLoader(
dataset_id=actor_call["defaultDatasetId"],
dataset_mapping_function=dataset_mapping_function,
)
[docs] async def acall_actor(
self,
actor_id: str,
run_input: Dict,
dataset_mapping_function: Callable[[Dict], Document],
*,
build: Optional[str] = None,
memory_mbytes: Optional[int] = None,
timeout_secs: Optional[int] = None,
) -> "ApifyDatasetLoader":
"""在Apify平台上运行一个Actor,并等待结果准备就绪。
参数:
actor_id (str): Apify平台上Actor的ID或名称。
run_input (Dict): 您要运行的Actor的输入对象。
dataset_mapping_function (Callable): 一个函数,接受一个字典(一个Apify数据集项)并将其转换为Document类的实例。
build (str, optional): 可选参数,指定要运行的actor构建。可以是构建标签或构建编号。
memory_mbytes (int, optional): 运行的可选内存限制,以兆字节为单位。
timeout_secs (int, optional): 运行的可选超时时间,以秒为单位。
返回:
ApifyDatasetLoader: 一个加载器,将从Actor运行的默认数据集中获取记录。
"""
from langchain_community.document_loaders import ApifyDatasetLoader
actor_call = await self.apify_client_async.actor(actor_id).call(
run_input=run_input,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=timeout_secs,
)
return ApifyDatasetLoader(
dataset_id=actor_call["defaultDatasetId"],
dataset_mapping_function=dataset_mapping_function,
)
[docs] def call_actor_task(
self,
task_id: str,
task_input: Dict,
dataset_mapping_function: Callable[[Dict], Document],
*,
build: Optional[str] = None,
memory_mbytes: Optional[int] = None,
timeout_secs: Optional[int] = None,
) -> "ApifyDatasetLoader":
"""在Apify上运行保存的Actor任务,并等待结果准备就绪。
参数:
task_id (str): Apify平台上任务的ID或名称。
task_input (Dict): 您要运行的任务的输入对象。覆盖任务的保存输入。
dataset_mapping_function (Callable): 一个接受单个字典(Apify数据集项)并将其转换为Document类实例的函数。
build (str, optional): 可选指定要运行的actor构建。可以是构建标签或构建编号。
memory_mbytes (int, optional): 运行的可选内存限制,以兆字节为单位。
timeout_secs (int, optional): 运行的可选超时时间,以秒为单位。
返回:
ApifyDatasetLoader: 一个加载器,将从任务运行的默认数据集中获取记录。
"""
from langchain_community.document_loaders import ApifyDatasetLoader
task_call = self.apify_client.task(task_id).call(
task_input=task_input,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=timeout_secs,
)
return ApifyDatasetLoader(
dataset_id=task_call["defaultDatasetId"],
dataset_mapping_function=dataset_mapping_function,
)
[docs] async def acall_actor_task(
self,
task_id: str,
task_input: Dict,
dataset_mapping_function: Callable[[Dict], Document],
*,
build: Optional[str] = None,
memory_mbytes: Optional[int] = None,
timeout_secs: Optional[int] = None,
) -> "ApifyDatasetLoader":
"""在Apify上运行保存的Actor任务,并等待结果准备就绪。
参数:
task_id (str): Apify平台上任务的ID或名称。
task_input (Dict): 您要运行的任务的输入对象。覆盖任务的保存输入。
dataset_mapping_function (Callable): 一个接受单个字典(Apify数据集项)并将其转换为Document类实例的函数。
build (str, optional): 可选指定要运行的actor构建。可以是构建标签或构建编号。
memory_mbytes (int, optional): 运行的可选内存限制,以兆字节为单位。
timeout_secs (int, optional): 运行的可选超时时间,以秒为单位。
返回:
ApifyDatasetLoader: 一个加载器,将从任务运行的默认数据集中获取记录。
"""
from langchain_community.document_loaders import ApifyDatasetLoader
task_call = await self.apify_client_async.task(task_id).call(
task_input=task_input,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=timeout_secs,
)
return ApifyDatasetLoader(
dataset_id=task_call["defaultDatasetId"],
dataset_mapping_function=dataset_mapping_function,
)