Source code for langchain_community.utilities.apify

from typing import TYPE_CHECKING, Any, Callable, Dict, Optional

from langchain_core.documents import Document
from langchain_core.pydantic_v1 import BaseModel, root_validator
from langchain_core.utils import get_from_dict_or_env

if TYPE_CHECKING:
    from langchain_community.document_loaders import ApifyDatasetLoader


[docs]class ApifyWrapper(BaseModel): """封装了Apify。 要使用,应安装``apify-client`` python包, 并设置环境变量``APIFY_API_TOKEN``为您的API密钥,或将 `apify_api_token`作为构造函数的一个命名参数传递。""" apify_client: Any apify_client_async: Any @root_validator() def validate_environment(cls, values: Dict) -> Dict: """验证环境。 验证是否设置了Apify API令牌,并且apify-client Python包存在于当前环境中。 """ apify_api_token = get_from_dict_or_env( values, "apify_api_token", "APIFY_API_TOKEN" ) try: from apify_client import ApifyClient, ApifyClientAsync client = ApifyClient(apify_api_token) if httpx_client := getattr(client.http_client, "httpx_client"): httpx_client.headers["user-agent"] += "; Origin/langchain" async_client = ApifyClientAsync(apify_api_token) if httpx_async_client := getattr( async_client.http_client, "httpx_async_client" ): httpx_async_client.headers["user-agent"] += "; Origin/langchain" values["apify_client"] = client values["apify_client_async"] = async_client except ImportError: raise ImportError( "Could not import apify-client Python package. " "Please install it with `pip install apify-client`." ) return values
[docs] def call_actor( self, actor_id: str, run_input: Dict, dataset_mapping_function: Callable[[Dict], Document], *, build: Optional[str] = None, memory_mbytes: Optional[int] = None, timeout_secs: Optional[int] = None, ) -> "ApifyDatasetLoader": """在Apify平台上运行一个Actor,并等待结果准备就绪。 参数: actor_id(str):Apify平台上Actor的ID或名称。 run_input(Dict):您要运行的Actor的输入对象。 dataset_mapping_function(Callable):一个函数,接受一个字典(Apify数据集项)并将其转换为Document类的实例。 build(str,可选):可选指定要运行的actor构建。可以是构建标签或构建编号。 memory_mbytes(int,可选):运行的可选内存限制,以兆字节为单位。 timeout_secs(int,可选):运行的可选超时时间,以秒为单位。 返回: ApifyDatasetLoader:一个加载器,将从Actor运行的默认数据集中获取记录。 """ from langchain_community.document_loaders import ApifyDatasetLoader actor_call = self.apify_client.actor(actor_id).call( run_input=run_input, build=build, memory_mbytes=memory_mbytes, timeout_secs=timeout_secs, ) return ApifyDatasetLoader( dataset_id=actor_call["defaultDatasetId"], dataset_mapping_function=dataset_mapping_function, )
[docs] async def acall_actor( self, actor_id: str, run_input: Dict, dataset_mapping_function: Callable[[Dict], Document], *, build: Optional[str] = None, memory_mbytes: Optional[int] = None, timeout_secs: Optional[int] = None, ) -> "ApifyDatasetLoader": """在Apify平台上运行一个Actor,并等待结果准备就绪。 参数: actor_id (str): Apify平台上Actor的ID或名称。 run_input (Dict): 您要运行的Actor的输入对象。 dataset_mapping_function (Callable): 一个函数,接受一个字典(一个Apify数据集项)并将其转换为Document类的实例。 build (str, optional): 可选参数,指定要运行的actor构建。可以是构建标签或构建编号。 memory_mbytes (int, optional): 运行的可选内存限制,以兆字节为单位。 timeout_secs (int, optional): 运行的可选超时时间,以秒为单位。 返回: ApifyDatasetLoader: 一个加载器,将从Actor运行的默认数据集中获取记录。 """ from langchain_community.document_loaders import ApifyDatasetLoader actor_call = await self.apify_client_async.actor(actor_id).call( run_input=run_input, build=build, memory_mbytes=memory_mbytes, timeout_secs=timeout_secs, ) return ApifyDatasetLoader( dataset_id=actor_call["defaultDatasetId"], dataset_mapping_function=dataset_mapping_function, )
[docs] def call_actor_task( self, task_id: str, task_input: Dict, dataset_mapping_function: Callable[[Dict], Document], *, build: Optional[str] = None, memory_mbytes: Optional[int] = None, timeout_secs: Optional[int] = None, ) -> "ApifyDatasetLoader": """在Apify上运行保存的Actor任务,并等待结果准备就绪。 参数: task_id (str): Apify平台上任务的ID或名称。 task_input (Dict): 您要运行的任务的输入对象。覆盖任务的保存输入。 dataset_mapping_function (Callable): 一个接受单个字典(Apify数据集项)并将其转换为Document类实例的函数。 build (str, optional): 可选指定要运行的actor构建。可以是构建标签或构建编号。 memory_mbytes (int, optional): 运行的可选内存限制,以兆字节为单位。 timeout_secs (int, optional): 运行的可选超时时间,以秒为单位。 返回: ApifyDatasetLoader: 一个加载器,将从任务运行的默认数据集中获取记录。 """ from langchain_community.document_loaders import ApifyDatasetLoader task_call = self.apify_client.task(task_id).call( task_input=task_input, build=build, memory_mbytes=memory_mbytes, timeout_secs=timeout_secs, ) return ApifyDatasetLoader( dataset_id=task_call["defaultDatasetId"], dataset_mapping_function=dataset_mapping_function, )
[docs] async def acall_actor_task( self, task_id: str, task_input: Dict, dataset_mapping_function: Callable[[Dict], Document], *, build: Optional[str] = None, memory_mbytes: Optional[int] = None, timeout_secs: Optional[int] = None, ) -> "ApifyDatasetLoader": """在Apify上运行保存的Actor任务,并等待结果准备就绪。 参数: task_id (str): Apify平台上任务的ID或名称。 task_input (Dict): 您要运行的任务的输入对象。覆盖任务的保存输入。 dataset_mapping_function (Callable): 一个接受单个字典(Apify数据集项)并将其转换为Document类实例的函数。 build (str, optional): 可选指定要运行的actor构建。可以是构建标签或构建编号。 memory_mbytes (int, optional): 运行的可选内存限制,以兆字节为单位。 timeout_secs (int, optional): 运行的可选超时时间,以秒为单位。 返回: ApifyDatasetLoader: 一个加载器,将从任务运行的默认数据集中获取记录。 """ from langchain_community.document_loaders import ApifyDatasetLoader task_call = await self.apify_client_async.task(task_id).call( task_input=task_input, build=build, memory_mbytes=memory_mbytes, timeout_secs=timeout_secs, ) return ApifyDatasetLoader( dataset_id=task_call["defaultDatasetId"], dataset_mapping_function=dataset_mapping_function, )