"""调用GitLab的工具。"""
from __future__ import annotations
import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator
from langchain_core.utils import get_from_dict_or_env
if TYPE_CHECKING:
from gitlab.v4.objects import Issue
[docs]class GitLabAPIWrapper(BaseModel):
"""GitLab API的封装。"""
gitlab: Any #: :meta private:
gitlab_repo_instance: Any #: :meta private:
gitlab_repository: Optional[str] = None
"""GitLab存储库的名称,格式为{用户名}/{存储库名称}。"""
gitlab_personal_access_token: Optional[str] = None
"""用于GitLab服务的个人访问令牌,用于身份验证。"""
gitlab_branch: Optional[str] = None
"""GitLab存储库中机器人将提交的特定分支。默认为'main'。"""
gitlab_base_branch: Optional[str] = None
"""GitLab存储库中的基本分支,用于比较。通常为'main'或'master'。默认为'main'。"""
class Config:
"""此pydantic对象的配置。"""
extra = Extra.forbid
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""验证环境中是否存在API密钥和Python包。"""
gitlab_url = get_from_dict_or_env(
values, "gitlab_url", "GITLAB_URL", default="https://gitlab.com"
)
gitlab_repository = get_from_dict_or_env(
values, "gitlab_repository", "GITLAB_REPOSITORY"
)
gitlab_personal_access_token = get_from_dict_or_env(
values, "gitlab_personal_access_token", "GITLAB_PERSONAL_ACCESS_TOKEN"
)
gitlab_branch = get_from_dict_or_env(
values, "gitlab_branch", "GITLAB_BRANCH", default="main"
)
gitlab_base_branch = get_from_dict_or_env(
values, "gitlab_base_branch", "GITLAB_BASE_BRANCH", default="main"
)
try:
import gitlab
except ImportError:
raise ImportError(
"python-gitlab is not installed. "
"Please install it with `pip install python-gitlab`"
)
g = gitlab.Gitlab(
url=gitlab_url,
private_token=gitlab_personal_access_token,
keep_base_url=True,
)
g.auth()
values["gitlab"] = g
values["gitlab_repo_instance"] = g.projects.get(gitlab_repository)
values["gitlab_repository"] = gitlab_repository
values["gitlab_personal_access_token"] = gitlab_personal_access_token
values["gitlab_branch"] = gitlab_branch
values["gitlab_base_branch"] = gitlab_base_branch
return values
[docs] def parse_issues(self, issues: List[Issue]) -> List[dict]:
"""从每个问题中提取标题和编号,并将它们放入字典中
参数:
issues(List[Issue]): 一个gitlab问题对象的列表
返回:
List[dict]: 一个包含问题标题和编号的字典
"""
parsed = []
for issue in issues:
title = issue.title
number = issue.iid
parsed.append({"title": title, "number": number})
return parsed
[docs] def get_issues(self) -> str:
"""从存储库中获取所有打开的问题
返回:
str:包含问题数量以及每个问题的标题和编号的纯文本报告。
"""
issues = self.gitlab_repo_instance.issues.list(state="opened")
if len(issues) > 0:
parsed_issues = self.parse_issues(issues)
parsed_issues_str = (
"Found " + str(len(parsed_issues)) + " issues:\n" + str(parsed_issues)
)
return parsed_issues_str
else:
return "No open issues available"
[docs] def get_issue(self, issue_number: int) -> Dict[str, Any]:
"""获取特定问题及其前10条评论
参数:
issue_number(int): gitlab问题的编号
返回:
dict: 包含问题标题、内容和评论的字典,评论以字符串形式表示
"""
issue = self.gitlab_repo_instance.issues.get(issue_number)
page = 0
comments: List[dict] = []
while len(comments) <= 10:
comments_page = issue.notes.list(page=page)
if len(comments_page) == 0:
break
for comment in comments_page:
comment = issue.notes.get(comment.id)
comments.append(
{"body": comment.body, "user": comment.author["username"]}
)
page += 1
return {
"title": issue.title,
"body": issue.description,
"comments": str(comments),
}
[docs] def create_pull_request(self, pr_query: str) -> str:
""" 从机器人的分支向基础分支发起拉取请求
参数:
pr_query(str): 包含PR标题和PR正文的字符串。标题是字符串的第一行,正文是字符串的其余部分。
例如,"更新了README
进行了添加信息的更改"
返回:
str: 成功或失败消息
"""
if self.gitlab_base_branch == self.gitlab_branch:
return """Cannot make a pull request because
commits are already in the master branch"""
else:
try:
title = pr_query.split("\n")[0]
body = pr_query[len(title) + 2 :]
pr = self.gitlab_repo_instance.mergerequests.create(
{
"source_branch": self.gitlab_branch,
"target_branch": self.gitlab_base_branch,
"title": title,
"description": body,
"labels": ["created-by-agent"],
}
)
return f"Successfully created PR number {pr.iid}"
except Exception as e:
return "Unable to make pull request due to error:\n" + str(e)
[docs] def create_file(self, file_query: str) -> str:
""" 在gitlab仓库上创建一个新文件
参数:
file_query(str): 一个包含文件路径和文件内容的字符串。文件路径是字符串的第一行,内容是字符串的其余部分。
例如, "hello_world.md
# Hello World!"
返回:
str: 一个成功或失败的消息
"""
file_path = file_query.split("\n")[0]
file_contents = file_query[len(file_path) + 2 :]
try:
self.gitlab_repo_instance.files.get(file_path, self.gitlab_branch)
return f"File already exists at {file_path}. Use update_file instead"
except Exception:
data = {
"branch": self.gitlab_branch,
"commit_message": "Create " + file_path,
"file_path": file_path,
"content": file_contents,
}
self.gitlab_repo_instance.files.create(data)
return "Created file " + file_path
[docs] def read_file(self, file_path: str) -> str:
"""从gitlab仓库中读取文件
参数:
file_path(str): 文件路径
返回:
str: 解码为字符串的文件
"""
file = self.gitlab_repo_instance.files.get(file_path, self.gitlab_branch)
return file.decode().decode("utf-8")
[docs] def update_file(self, file_query: str) -> str:
"""更新文件内容。
参数:
file_query(str): 包含文件路径和文件内容。
旧文件内容用OLD <<<<和>>>> OLD包裹
新文件内容用NEW <<<<和>>>> NEW包裹
例如:
test/hello.txt
OLD <<<<
Hello Earth!
>>>> OLD
NEW <<<<
Hello Mars!
>>>> NEW
返回:
成功或失败消息
"""
try:
file_path = file_query.split("\n")[0]
old_file_contents = (
file_query.split("OLD <<<<")[1].split(">>>> OLD")[0].strip()
)
new_file_contents = (
file_query.split("NEW <<<<")[1].split(">>>> NEW")[0].strip()
)
file_content = self.read_file(file_path)
updated_file_content = file_content.replace(
old_file_contents, new_file_contents
)
if file_content == updated_file_content:
return (
"File content was not updated because old content was not found."
"It may be helpful to use the read_file action to get "
"the current file contents."
)
commit = {
"branch": self.gitlab_branch,
"commit_message": "Create " + file_path,
"actions": [
{
"action": "update",
"file_path": file_path,
"content": updated_file_content,
}
],
}
self.gitlab_repo_instance.commits.create(commit)
return "Updated file " + file_path
except Exception as e:
return "Unable to update file due to error:\n" + str(e)
[docs] def delete_file(self, file_path: str) -> str:
"""从仓库中删除一个文件
参数:
file_path(str): 文件所在的路径
返回:
str: 成功或失败的消息
"""
try:
self.gitlab_repo_instance.files.delete(
file_path, self.gitlab_branch, "Delete " + file_path
)
return "Deleted file " + file_path
except Exception as e:
return "Unable to delete file due to error:\n" + str(e)
[docs] def run(self, mode: str, query: str) -> str:
if mode == "get_issues":
return self.get_issues()
elif mode == "get_issue":
return json.dumps(self.get_issue(int(query)))
elif mode == "comment_on_issue":
return self.comment_on_issue(query)
elif mode == "create_file":
return self.create_file(query)
elif mode == "create_pull_request":
return self.create_pull_request(query)
elif mode == "read_file":
return self.read_file(query)
elif mode == "update_file":
return self.update_file(query)
elif mode == "delete_file":
return self.delete_file(query)
else:
raise ValueError("Invalid mode" + mode)