"""与Zapier NLA 交互的工具。
完整文档在这里:https://nla.zapier.com/start/
注意:此包装器目前仅实现了用于测试和服务器端生产用例的 `api_key` 认证方法(使用开发人员在 Zapier.com 上连接的帐户)
对于 LangChain + Zapier NLA 驱动用户界面应用程序的用例,以及 LangChain 需要访问终端用户在 Zapier.com 上连接的帐户的情况,您将需要使用oauth。请查看上面的完整文档,并联系 nla@zapier.com 获取开发人员支持。
"""
import json
from typing import Any, Dict, List, Optional
import aiohttp
import requests
from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator
from langchain_core.utils import get_from_dict_or_env
from requests import Request, Session
[docs]class ZapierNLAWrapper(BaseModel):
"""Zapier NLA的包装器。
完整文档在这里:https://nla.zapier.com/start/
该包装器支持API密钥和OAuth凭据两种认证方法。API密钥是使用该包装器最快的方法。
使用`zapier_nla_api_key`或`zapier_nla_oauth_access_token`参数调用此包装器,或设置`ZAPIER_NLA_API_KEY`环境变量。如果两个参数都设置了,访问令牌将优先。
对于LangChain + Zapier NLA驱动用户界面应用程序的用例,LangChain需要访问Zapier.com上终端用户连接的帐户,您需要使用OAuth。请查阅上面的完整文档,了解如何创建自己的提供程序并生成凭据。"""
zapier_nla_api_key: str
zapier_nla_oauth_access_token: str
zapier_nla_api_base: str = "https://nla.zapier.com/api/v1/"
class Config:
"""此pydantic对象的配置。"""
extra = Extra.forbid
def _format_headers(self) -> Dict[str, str]:
"""为请求格式化标题。"""
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
if self.zapier_nla_oauth_access_token:
headers.update(
{"Authorization": f"Bearer {self.zapier_nla_oauth_access_token}"}
)
else:
headers.update({"X-API-Key": self.zapier_nla_api_key})
return headers
def _get_session(self) -> Session:
session = requests.Session()
session.headers.update(self._format_headers())
return session
async def _arequest(self, method: str, url: str, **kwargs: Any) -> Dict[str, Any]:
"""发起一个异步请求。"""
async with aiohttp.ClientSession(headers=self._format_headers()) as session:
async with session.request(method, url, **kwargs) as response:
response.raise_for_status()
return await response.json()
def _create_action_payload( # type: ignore[no-untyped-def]
self, instructions: str, params: Optional[Dict] = None, preview_only=False
) -> Dict:
"""为一个动作创建有效载荷。"""
data = params if params else {}
data.update(
{
"instructions": instructions,
}
)
if preview_only:
data.update({"preview_only": True})
return data
def _create_action_url(self, action_id: str) -> str:
"""为一个操作创建一个URL。"""
return self.zapier_nla_api_base + f"exposed/{action_id}/execute/"
def _create_action_request( # type: ignore[no-untyped-def]
self,
action_id: str,
instructions: str,
params: Optional[Dict] = None,
preview_only=False,
) -> Request:
data = self._create_action_payload(instructions, params, preview_only)
return Request(
"POST",
self._create_action_url(action_id),
json=data,
)
@root_validator(pre=True)
def validate_environment(cls, values: Dict) -> Dict:
"""验证环境中是否存在API密钥。"""
zapier_nla_api_key_default = None
# If there is a oauth_access_key passed in the values
# we don't need a nla_api_key it can be blank
if "zapier_nla_oauth_access_token" in values:
zapier_nla_api_key_default = ""
else:
values["zapier_nla_oauth_access_token"] = ""
# we require at least one API Key
zapier_nla_api_key = get_from_dict_or_env(
values,
"zapier_nla_api_key",
"ZAPIER_NLA_API_KEY",
zapier_nla_api_key_default,
)
values["zapier_nla_api_key"] = zapier_nla_api_key
return values
[docs] async def alist(self) -> List[Dict]:
"""返回与当前用户关联的所有公开(已启用)操作的列表(与设置的api_key关联)。在这里更改您的公开操作:https://nla.zapier.com/demo/start/
如果没有公开操作,则返回列表可能为空。否则将包含操作对象的列表:
[{
"id": str,
"description": str,
"params": Dict[str, str]
}]
`params`将始终包含一个`instructions`键,这是唯一必需的参数。所有其他参数都是可选的,如果提供将覆盖任何AI猜测(请参阅此处的“了解AI猜测流程”:https://nla.zapier.com/api/v1/docs)
"""
response = await self._arequest("GET", self.zapier_nla_api_base + "exposed/")
return response["results"]
[docs] def list(self) -> List[Dict]:
"""返回与当前用户关联(与设置的api_key关联)的所有公开(已启用)操作的列表。在这里更改您的公开操作:https://nla.zapier.com/demo/start/
如果没有公开操作,则返回列表可能为空。否则,将包含操作对象的列表:
[{
"id": str,
"description": str,
"params": Dict[str, str]
}]
`params`将始终包含一个`instructions`键,这是唯一必需的参数。所有其他参数都是可选的,如果提供将覆盖任何AI猜测(请参见此处的“了解AI猜测流程”:https://nla.zapier.com/docs/using-the-api#ai-guessing)
"""
session = self._get_session()
try:
response = session.get(self.zapier_nla_api_base + "exposed/")
response.raise_for_status()
except requests.HTTPError as http_err:
if response.status_code == 401:
if self.zapier_nla_oauth_access_token:
raise requests.HTTPError(
f"An unauthorized response occurred. Check that your "
f"access token is correct and doesn't need to be "
f"refreshed. Err: {http_err}",
response=response,
)
raise requests.HTTPError(
f"An unauthorized response occurred. Check that your api "
f"key is correct. Err: {http_err}",
response=response,
)
raise http_err
return response.json()["results"]
[docs] def run(
self, action_id: str, instructions: str, params: Optional[Dict] = None
) -> Dict:
"""执行由action_id标识的操作,必须由当前用户(与设置的api_key相关联)公开(启用)。在此更改您的公开操作:https://nla.zapier.com/demo/start/
返回的JSON保证少于~500个字(350个标记),因此可以安全地注入到另一个LLM调用的提示中。
"""
session = self._get_session()
request = self._create_action_request(action_id, instructions, params)
response = session.send(session.prepare_request(request))
response.raise_for_status()
return response.json()["result"]
[docs] async def arun(
self, action_id: str, instructions: str, params: Optional[Dict] = None
) -> Dict:
"""执行由action_id标识的操作,必须由当前用户(与设置的api_key相关联)公开(启用)。在此更改您的公开操作:https://nla.zapier.com/demo/start/
返回的JSON保证少于~500个字(350个标记),因此可以安全地注入到另一个LLM调用的提示中。
"""
response = await self._arequest(
"POST",
self._create_action_url(action_id),
json=self._create_action_payload(instructions, params),
)
return response["result"]
[docs] def preview(
self, action_id: str, instructions: str, params: Optional[Dict] = None
) -> Dict:
"""与运行相同,但是不会实际执行动作,而是会返回AI猜测的参数预览,以便您在执行之前需要明确审查。
"""
session = self._get_session()
params = params if params else {}
params.update({"preview_only": True})
request = self._create_action_request(action_id, instructions, params, True)
response = session.send(session.prepare_request(request))
response.raise_for_status()
return response.json()["input_params"]
[docs] async def apreview(
self, action_id: str, instructions: str, params: Optional[Dict] = None
) -> Dict:
"""与运行相同,但是不会实际执行动作,而是会返回AI猜测的参数预览,以便您在执行之前需要明确审查。
"""
response = await self._arequest(
"POST",
self._create_action_url(action_id),
json=self._create_action_payload(instructions, params, preview_only=True),
)
return response["result"]
[docs] def run_as_str(self, *args, **kwargs) -> str: # type: ignore[no-untyped-def]
"""与运行相同,但返回JSON的字符串版本,以便插入回LLM。
"""
data = self.run(*args, **kwargs)
return json.dumps(data)
[docs] async def arun_as_str(self, *args, **kwargs) -> str: # type: ignore[no-untyped-def]
"""与运行相同,但返回JSON的字符串版本,以便插入回LLM。
"""
data = await self.arun(*args, **kwargs)
return json.dumps(data)
[docs] def preview_as_str(self, *args, **kwargs) -> str: # type: ignore[no-untyped-def]
"""与预览相同,但返回JSON的字符串版本,用于插入回LLM中。
"""
data = self.preview(*args, **kwargs)
return json.dumps(data)
[docs] async def apreview_as_str( # type: ignore[no-untyped-def]
self, *args, **kwargs
) -> str:
"""与预览相同,但返回JSON的字符串版本,用于插入回LLM中。
"""
data = await self.apreview(*args, **kwargs)
return json.dumps(data)
[docs] def list_as_str(self) -> str: # type: ignore[no-untyped-def]
"""与列表相同,但返回JSON的字符串版本,用于插入回LLM。
"""
actions = self.list()
return json.dumps(actions)
[docs] async def alist_as_str(self) -> str: # type: ignore[no-untyped-def]
"""与列表相同,但返回JSON的字符串版本,用于插入回LLM。
"""
actions = await self.alist()
return json.dumps(actions)