"""调用ClickUp的工具。"""
import json
import warnings
from dataclasses import asdict, dataclass, fields
from typing import Any, Dict, List, Mapping, Optional, Tuple, Type, Union
import requests
from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator
from langchain_core.utils import get_from_dict_or_env
DEFAULT_URL = "https://api.clickup.com/api/v2"
[docs]@dataclass
class Component:
"""所有组件的基类。"""
[docs] @classmethod
def from_data(cls, data: Dict[str, Any]) -> "Component":
raise NotImplementedError()
[docs]@dataclass
class Task(Component):
"""任务的类。"""
id: int
name: str
text_content: str
description: str
status: str
creator_id: int
creator_username: str
creator_email: str
assignees: List[Dict[str, Any]]
watchers: List[Dict[str, Any]]
priority: Optional[str]
due_date: Optional[str]
start_date: Optional[str]
points: int
team_id: int
project_id: int
[docs] @classmethod
def from_data(cls, data: Dict[str, Any]) -> "Task":
priority = None if data["priority"] is None else data["priority"]["priority"]
return cls(
id=data["id"],
name=data["name"],
text_content=data["text_content"],
description=data["description"],
status=data["status"]["status"],
creator_id=data["creator"]["id"],
creator_username=data["creator"]["username"],
creator_email=data["creator"]["email"],
assignees=data["assignees"],
watchers=data["watchers"],
priority=priority,
due_date=data["due_date"],
start_date=data["start_date"],
points=data["points"],
team_id=data["team_id"],
project_id=data["project"]["id"],
)
[docs]@dataclass
class CUList(Component):
"""列表的组件类。"""
folder_id: float
name: str
content: Optional[str] = None
due_date: Optional[int] = None
due_date_time: Optional[bool] = None
priority: Optional[int] = None
assignee: Optional[int] = None
status: Optional[str] = None
[docs] @classmethod
def from_data(cls, data: dict) -> "CUList":
return cls(
folder_id=data["folder_id"],
name=data["name"],
content=data.get("content"),
due_date=data.get("due_date"),
due_date_time=data.get("due_date_time"),
priority=data.get("priority"),
assignee=data.get("assignee"),
status=data.get("status"),
)
[docs]@dataclass
class Member(Component):
"""成员的组件类。"""
id: int
username: str
email: str
initials: str
[docs] @classmethod
def from_data(cls, data: Dict) -> "Member":
return cls(
id=data["user"]["id"],
username=data["user"]["username"],
email=data["user"]["email"],
initials=data["user"]["initials"],
)
[docs]@dataclass
class Team(Component):
"""团队的组件类。"""
id: int
name: str
members: List[Member]
[docs] @classmethod
def from_data(cls, data: Dict) -> "Team":
members = [Member.from_data(member_data) for member_data in data["members"]]
return cls(id=data["id"], name=data["name"], members=members)
[docs]@dataclass
class Space(Component):
"""空间的组件类。"""
id: int
name: str
private: bool
enabled_features: Dict[str, Any]
[docs] @classmethod
def from_data(cls, data: Dict[str, Any]) -> "Space":
space_data = data["spaces"][0]
enabled_features = {
feature: value
for feature, value in space_data["features"].items()
if value["enabled"]
}
return cls(
id=space_data["id"],
name=space_data["name"],
private=space_data["private"],
enabled_features=enabled_features,
)
[docs]def parse_dict_through_component(
data: dict, component: Type[Component], fault_tolerant: bool = False
) -> Dict:
"""通过创建一个组件来解析字典,然后将其转换回字典。
这有助于两件事情:
1. 根据模式从字典中提取和格式化数据
2. 提供一个中心化的、容错的方式来执行这些操作
"""
try:
return asdict(component.from_data(data))
except Exception as e:
if fault_tolerant:
warning_str = f"""Error encountered while trying to parse
{str(data)}: {str(e)}\n Falling back to returning input data."""
warnings.warn(warning_str)
return data
else:
raise e
[docs]def load_query(
query: str, fault_tolerant: bool = False
) -> Tuple[Optional[Dict], Optional[str]]:
"""解析JSON字符串并返回解析后的对象。
如果解析失败,返回一个错误消息。
:param query: 要解析的JSON字符串。
:return: 包含解析后的对象或None以及错误消息或None的元组。
异常:
json.JSONDecodeError: 如果输入不是有效的JSON字符串。
"""
try:
return json.loads(query), None
except json.JSONDecodeError as e:
if fault_tolerant:
return (
None,
f"""Input must be a valid JSON. Got the following error: {str(e)}.
"Please reformat and try again.""",
)
else:
raise e
[docs]def fetch_first_id(data: dict, key: str) -> Optional[int]:
"""从字典中获取第一个id。"""
if key in data and len(data[key]) > 0:
if len(data[key]) > 1:
warnings.warn(f"Found multiple {key}: {data[key]}. Defaulting to first.")
return data[key][0]["id"]
return None
[docs]def fetch_data(url: str, access_token: str, query: Optional[dict] = None) -> dict:
"""从URL获取数据。"""
headers = {"Authorization": access_token}
response = requests.get(url, headers=headers, params=query)
response.raise_for_status()
return response.json()
[docs]def fetch_team_id(access_token: str) -> Optional[int]:
"""获取团队ID。"""
url = f"{DEFAULT_URL}/team"
data = fetch_data(url, access_token)
return fetch_first_id(data, "teams")
[docs]def fetch_space_id(team_id: int, access_token: str) -> Optional[int]:
"""获取空间ID。"""
url = f"{DEFAULT_URL}/team/{team_id}/space"
data = fetch_data(url, access_token, query={"archived": "false"})
return fetch_first_id(data, "spaces")
[docs]def fetch_folder_id(space_id: int, access_token: str) -> Optional[int]:
"""获取文件夹的ID。"""
url = f"{DEFAULT_URL}/space/{space_id}/folder"
data = fetch_data(url, access_token, query={"archived": "false"})
return fetch_first_id(data, "folders")
[docs]def fetch_list_id(space_id: int, folder_id: int, access_token: str) -> Optional[int]:
"""获取列表的ID。"""
if folder_id:
url = f"{DEFAULT_URL}/folder/{folder_id}/list"
else:
url = f"{DEFAULT_URL}/space/{space_id}/list"
data = fetch_data(url, access_token, query={"archived": "false"})
# The structure to fetch list id differs based if its folderless
if folder_id and "id" in data:
return data["id"]
else:
return fetch_first_id(data, "lists")
[docs]class ClickupAPIWrapper(BaseModel):
"""Clickup API的封装器。"""
access_token: Optional[str] = None
team_id: Optional[str] = None
space_id: Optional[str] = None
folder_id: Optional[str] = None
list_id: Optional[str] = None
class Config:
"""此pydantic对象的配置。"""
extra = Extra.forbid
[docs] @classmethod
def get_access_code_url(
cls, oauth_client_id: str, redirect_uri: str = "https://google.com"
) -> str:
"""获取用于获取访问代码的URL。"""
url = f"https://app.clickup.com/api?client_id={oauth_client_id}"
return f"{url}&redirect_uri={redirect_uri}"
[docs] @classmethod
def get_access_token(
cls, oauth_client_id: str, oauth_client_secret: str, code: str
) -> Optional[str]:
"""获取访问令牌。"""
url = f"{DEFAULT_URL}/oauth/token"
params = {
"client_id": oauth_client_id,
"client_secret": oauth_client_secret,
"code": code,
}
response = requests.post(url, params=params)
data = response.json()
if "access_token" not in data:
print(f"Error: {data}") # noqa: T201
if "ECODE" in data and data["ECODE"] == "OAUTH_014":
url = ClickupAPIWrapper.get_access_code_url(oauth_client_id)
print( # noqa: T201
"You already used this code once. Generate a new one.",
f"Our best guess for the url to get a new code is:\n{url}",
)
return None
return data["access_token"]
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""验证环境中是否存在API密钥和Python包。"""
values["access_token"] = get_from_dict_or_env(
values, "access_token", "CLICKUP_ACCESS_TOKEN"
)
values["team_id"] = fetch_team_id(values["access_token"])
values["space_id"] = fetch_space_id(values["team_id"], values["access_token"])
values["folder_id"] = fetch_folder_id(
values["space_id"], values["access_token"]
)
values["list_id"] = fetch_list_id(
values["space_id"], values["folder_id"], values["access_token"]
)
return values
[docs] def attempt_parse_teams(self, input_dict: dict) -> Dict[str, List[dict]]:
"""从团队列表中解析适当的内容。"""
parsed_teams: Dict[str, List[dict]] = {"teams": []}
for team in input_dict["teams"]:
try:
team = parse_dict_through_component(team, Team, fault_tolerant=False)
parsed_teams["teams"].append(team)
except Exception as e:
warnings.warn(f"Error parsing a team {e}")
return parsed_teams
[docs] def get_default_params(self) -> Dict:
return {"archived": "false"}
[docs] def get_authorized_teams(self) -> Dict[Any, Any]:
"""获取用户的所有团队。"""
url = f"{DEFAULT_URL}/team"
response = requests.get(url, headers=self.get_headers())
data = response.json()
parsed_teams = self.attempt_parse_teams(data)
return parsed_teams
[docs] def get_folders(self) -> Dict:
"""
获取团队的所有文件夹。
"""
url = f"{DEFAULT_URL}/team/" + str(self.team_id) + "/space"
params = self.get_default_params()
response = requests.get(url, headers=self.get_headers(), params=params)
return {"response": response}
[docs] def get_task(self, query: str, fault_tolerant: bool = True) -> Dict:
"""
检索特定任务。
"""
params, error = load_query(query, fault_tolerant=True)
if params is None:
return {"Error": error}
url = f"{DEFAULT_URL}/task/{params['task_id']}"
params = {
"custom_task_ids": "true",
"team_id": self.team_id,
"include_subtasks": "true",
}
response = requests.get(url, headers=self.get_headers(), params=params)
data = response.json()
parsed_task = parse_dict_through_component(
data, Task, fault_tolerant=fault_tolerant
)
return parsed_task
[docs] def get_lists(self) -> Dict:
"""
获取所有可用的列表。
"""
url = f"{DEFAULT_URL}/folder/{self.folder_id}/list"
params = self.get_default_params()
response = requests.get(url, headers=self.get_headers(), params=params)
return {"response": response}
[docs] def query_tasks(self, query: str) -> Dict:
"""
查询与特定字段匹配的任务
"""
params, error = load_query(query, fault_tolerant=True)
if params is None:
return {"Error": error}
url = f"{DEFAULT_URL}/list/{params['list_id']}/task"
params = self.get_default_params()
response = requests.get(url, headers=self.get_headers(), params=params)
return {"response": response}
[docs] def get_spaces(self) -> Dict:
"""
获取团队的所有空间。
"""
url = f"{DEFAULT_URL}/team/{self.team_id}/space"
response = requests.get(
url, headers=self.get_headers(), params=self.get_default_params()
)
data = response.json()
parsed_spaces = parse_dict_through_component(data, Space, fault_tolerant=True)
return parsed_spaces
[docs] def get_task_attribute(self, query: str) -> Dict:
"""
更新指定任务的属性。
"""
task = self.get_task(query, fault_tolerant=True)
params, error = load_query(query, fault_tolerant=True)
if not isinstance(params, dict):
return {"Error": error}
if params["attribute_name"] not in task:
return {
"Error": f"""attribute_name = {params['attribute_name']} was not
found in task keys {task.keys()}. Please call again with one of the key names."""
}
return {params["attribute_name"]: task[params["attribute_name"]]}
[docs] def update_task(self, query: str) -> Dict:
"""
更新指定任务的属性。
"""
query_dict, error = load_query(query, fault_tolerant=True)
if query_dict is None:
return {"Error": error}
url = f"{DEFAULT_URL}/task/{query_dict['task_id']}"
params = {
"custom_task_ids": "true",
"team_id": self.team_id,
"include_subtasks": "true",
}
headers = self.get_headers()
payload = {query_dict["attribute_name"]: query_dict["value"]}
response = requests.put(url, headers=headers, params=params, json=payload)
return {"response": response}
[docs] def update_task_assignees(self, query: str) -> Dict:
"""
添加或删除指定任务的受让人。
"""
query_dict, error = load_query(query, fault_tolerant=True)
if query_dict is None:
return {"Error": error}
for user in query_dict["users"]:
if not isinstance(user, int):
return {
"Error": f"""All users must be integers, not strings!
"Got user {user} if type {type(user)}"""
}
url = f"{DEFAULT_URL}/task/{query_dict['task_id']}"
headers = self.get_headers()
if query_dict["operation"] == "add":
assigne_payload = {"add": query_dict["users"], "rem": []}
elif query_dict["operation"] == "rem":
assigne_payload = {"add": [], "rem": query_dict["users"]}
else:
raise ValueError(
f"Invalid operation ({query_dict['operation']}). ",
"Valid options ['add', 'rem'].",
)
params = {
"custom_task_ids": "true",
"team_id": self.team_id,
"include_subtasks": "true",
}
payload = {"assignees": assigne_payload}
response = requests.put(url, headers=headers, params=params, json=payload)
return {"response": response}
[docs] def create_task(self, query: str) -> Dict:
"""
创建一个新的任务。
"""
query_dict, error = load_query(query, fault_tolerant=True)
if query_dict is None:
return {"Error": error}
list_id = self.list_id
url = f"{DEFAULT_URL}/list/{list_id}/task"
params = {"custom_task_ids": "true", "team_id": self.team_id}
payload = extract_dict_elements_from_component_fields(query_dict, Task)
headers = self.get_headers()
response = requests.post(url, json=payload, headers=headers, params=params)
data: Dict = response.json()
return parse_dict_through_component(data, Task, fault_tolerant=True)
[docs] def create_list(self, query: str) -> Dict:
"""
创建一个新的列表。
"""
query_dict, error = load_query(query, fault_tolerant=True)
if query_dict is None:
return {"Error": error}
# Default to using folder as location if it exists.
# If not, fall back to using the space.
location = self.folder_id if self.folder_id else self.space_id
url = f"{DEFAULT_URL}/folder/{location}/list"
payload = extract_dict_elements_from_component_fields(query_dict, Task)
headers = self.get_headers()
response = requests.post(url, json=payload, headers=headers)
data = response.json()
parsed_list = parse_dict_through_component(data, CUList, fault_tolerant=True)
# set list id to new list
if "id" in parsed_list:
self.list_id = parsed_list["id"]
return parsed_list
[docs] def create_folder(self, query: str) -> Dict:
"""
创建一个新文件夹。
"""
query_dict, error = load_query(query, fault_tolerant=True)
if query_dict is None:
return {"Error": error}
space_id = self.space_id
url = f"{DEFAULT_URL}/space/{space_id}/folder"
payload = {
"name": query_dict["name"],
}
headers = self.get_headers()
response = requests.post(url, json=payload, headers=headers)
data = response.json()
if "id" in data:
self.list_id = data["id"]
return data
[docs] def run(self, mode: str, query: str) -> str:
"""运行API。"""
if mode == "get_task":
output = self.get_task(query)
elif mode == "get_task_attribute":
output = self.get_task_attribute(query)
elif mode == "get_teams":
output = self.get_authorized_teams()
elif mode == "create_task":
output = self.create_task(query)
elif mode == "create_list":
output = self.create_list(query)
elif mode == "create_folder":
output = self.create_folder(query)
elif mode == "get_lists":
output = self.get_lists()
elif mode == "get_folders":
output = self.get_folders()
elif mode == "get_spaces":
output = self.get_spaces()
elif mode == "update_task":
output = self.update_task(query)
elif mode == "update_task_assignees":
output = self.update_task_assignees(query)
else:
output = {"ModeError": f"Got unexpected mode {mode}."}
try:
return json.dumps(output)
except Exception:
return str(output)