Source code for langchain_community.utilities.gitlab

"""调用GitLab的工具。"""
from __future__ import annotations

import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator
from langchain_core.utils import get_from_dict_or_env

if TYPE_CHECKING:
    from gitlab.v4.objects import Issue


[docs]class GitLabAPIWrapper(BaseModel): """GitLab API的封装。""" gitlab: Any #: :meta private: gitlab_repo_instance: Any #: :meta private: gitlab_repository: Optional[str] = None """GitLab存储库的名称,格式为{用户名}/{存储库名称}。""" gitlab_personal_access_token: Optional[str] = None """用于GitLab服务的个人访问令牌,用于身份验证。""" gitlab_branch: Optional[str] = None """GitLab存储库中机器人将提交的特定分支。默认为'main'。""" gitlab_base_branch: Optional[str] = None """GitLab存储库中的基本分支,用于比较。通常为'main'或'master'。默认为'main'。""" class Config: """此pydantic对象的配置。""" extra = Extra.forbid @root_validator() def validate_environment(cls, values: Dict) -> Dict: """验证环境中是否存在API密钥和Python包。""" gitlab_url = get_from_dict_or_env( values, "gitlab_url", "GITLAB_URL", default="https://gitlab.com" ) gitlab_repository = get_from_dict_or_env( values, "gitlab_repository", "GITLAB_REPOSITORY" ) gitlab_personal_access_token = get_from_dict_or_env( values, "gitlab_personal_access_token", "GITLAB_PERSONAL_ACCESS_TOKEN" ) gitlab_branch = get_from_dict_or_env( values, "gitlab_branch", "GITLAB_BRANCH", default="main" ) gitlab_base_branch = get_from_dict_or_env( values, "gitlab_base_branch", "GITLAB_BASE_BRANCH", default="main" ) try: import gitlab except ImportError: raise ImportError( "python-gitlab is not installed. " "Please install it with `pip install python-gitlab`" ) g = gitlab.Gitlab( url=gitlab_url, private_token=gitlab_personal_access_token, keep_base_url=True, ) g.auth() values["gitlab"] = g values["gitlab_repo_instance"] = g.projects.get(gitlab_repository) values["gitlab_repository"] = gitlab_repository values["gitlab_personal_access_token"] = gitlab_personal_access_token values["gitlab_branch"] = gitlab_branch values["gitlab_base_branch"] = gitlab_base_branch return values
[docs] def parse_issues(self, issues: List[Issue]) -> List[dict]: """从每个问题中提取标题和编号,并将它们放入字典中 参数: issues(List[Issue]): 一个gitlab问题对象的列表 返回: List[dict]: 一个包含问题标题和编号的字典 """ parsed = [] for issue in issues: title = issue.title number = issue.iid parsed.append({"title": title, "number": number}) return parsed
[docs] def get_issues(self) -> str: """从存储库中获取所有打开的问题 返回: str:包含问题数量以及每个问题的标题和编号的纯文本报告。 """ issues = self.gitlab_repo_instance.issues.list(state="opened") if len(issues) > 0: parsed_issues = self.parse_issues(issues) parsed_issues_str = ( "Found " + str(len(parsed_issues)) + " issues:\n" + str(parsed_issues) ) return parsed_issues_str else: return "No open issues available"
[docs] def get_issue(self, issue_number: int) -> Dict[str, Any]: """获取特定问题及其前10条评论 参数: issue_number(int): gitlab问题的编号 返回: dict: 包含问题标题、内容和评论的字典,评论以字符串形式表示 """ issue = self.gitlab_repo_instance.issues.get(issue_number) page = 0 comments: List[dict] = [] while len(comments) <= 10: comments_page = issue.notes.list(page=page) if len(comments_page) == 0: break for comment in comments_page: comment = issue.notes.get(comment.id) comments.append( {"body": comment.body, "user": comment.author["username"]} ) page += 1 return { "title": issue.title, "body": issue.description, "comments": str(comments), }
[docs] def create_pull_request(self, pr_query: str) -> str: """ 从机器人的分支向基础分支发起拉取请求 参数: pr_query(str): 包含PR标题和PR正文的字符串。标题是字符串的第一行,正文是字符串的其余部分。 例如,"更新了README 进行了添加信息的更改" 返回: str: 成功或失败消息 """ if self.gitlab_base_branch == self.gitlab_branch: return """Cannot make a pull request because commits are already in the master branch""" else: try: title = pr_query.split("\n")[0] body = pr_query[len(title) + 2 :] pr = self.gitlab_repo_instance.mergerequests.create( { "source_branch": self.gitlab_branch, "target_branch": self.gitlab_base_branch, "title": title, "description": body, "labels": ["created-by-agent"], } ) return f"Successfully created PR number {pr.iid}" except Exception as e: return "Unable to make pull request due to error:\n" + str(e)
[docs] def comment_on_issue(self, comment_query: str) -> str: """ 在gitlab问题中添加注释 # 参数: # comment_query(str): 包含问题编号、两个换行符和注释的字符串。 # 例如: "1 正在处理中" # 将注释"正在处理中"添加到问题1中 # 返回: # str: 成功或失败消息 """ issue_number = int(comment_query.split("\n\n")[0]) comment = comment_query[len(str(issue_number)) + 2 :] try: issue = self.gitlab_repo_instance.issues.get(issue_number) issue.notes.create({"body": comment}) return "Commented on issue " + str(issue_number) except Exception as e: return "Unable to make comment due to error:\n" + str(e)
[docs] def create_file(self, file_query: str) -> str: """ 在gitlab仓库上创建一个新文件 参数: file_query(str): 一个包含文件路径和文件内容的字符串。文件路径是字符串的第一行,内容是字符串的其余部分。 例如, "hello_world.md # Hello World!" 返回: str: 一个成功或失败的消息 """ file_path = file_query.split("\n")[0] file_contents = file_query[len(file_path) + 2 :] try: self.gitlab_repo_instance.files.get(file_path, self.gitlab_branch) return f"File already exists at {file_path}. Use update_file instead" except Exception: data = { "branch": self.gitlab_branch, "commit_message": "Create " + file_path, "file_path": file_path, "content": file_contents, } self.gitlab_repo_instance.files.create(data) return "Created file " + file_path
[docs] def read_file(self, file_path: str) -> str: """从gitlab仓库中读取文件 参数: file_path(str): 文件路径 返回: str: 解码为字符串的文件 """ file = self.gitlab_repo_instance.files.get(file_path, self.gitlab_branch) return file.decode().decode("utf-8")
[docs] def update_file(self, file_query: str) -> str: """更新文件内容。 参数: file_query(str): 包含文件路径和文件内容。 旧文件内容用OLD <<<<和>>>> OLD包裹 新文件内容用NEW <<<<和>>>> NEW包裹 例如: test/hello.txt OLD <<<< Hello Earth! >>>> OLD NEW <<<< Hello Mars! >>>> NEW 返回: 成功或失败消息 """ try: file_path = file_query.split("\n")[0] old_file_contents = ( file_query.split("OLD <<<<")[1].split(">>>> OLD")[0].strip() ) new_file_contents = ( file_query.split("NEW <<<<")[1].split(">>>> NEW")[0].strip() ) file_content = self.read_file(file_path) updated_file_content = file_content.replace( old_file_contents, new_file_contents ) if file_content == updated_file_content: return ( "File content was not updated because old content was not found." "It may be helpful to use the read_file action to get " "the current file contents." ) commit = { "branch": self.gitlab_branch, "commit_message": "Create " + file_path, "actions": [ { "action": "update", "file_path": file_path, "content": updated_file_content, } ], } self.gitlab_repo_instance.commits.create(commit) return "Updated file " + file_path except Exception as e: return "Unable to update file due to error:\n" + str(e)
[docs] def delete_file(self, file_path: str) -> str: """从仓库中删除一个文件 参数: file_path(str): 文件所在的路径 返回: str: 成功或失败的消息 """ try: self.gitlab_repo_instance.files.delete( file_path, self.gitlab_branch, "Delete " + file_path ) return "Deleted file " + file_path except Exception as e: return "Unable to delete file due to error:\n" + str(e)
[docs] def run(self, mode: str, query: str) -> str: if mode == "get_issues": return self.get_issues() elif mode == "get_issue": return json.dumps(self.get_issue(int(query))) elif mode == "comment_on_issue": return self.comment_on_issue(query) elif mode == "create_file": return self.create_file(query) elif mode == "create_pull_request": return self.create_pull_request(query) elif mode == "read_file": return self.read_file(query) elif mode == "update_file": return self.update_file(query) elif mode == "delete_file": return self.delete_file(query) else: raise ValueError("Invalid mode" + mode)