Source code for langchain_community.document_loaders.git

import os
from typing import Callable, Iterator, Optional

from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader


[docs]class GitLoader(BaseLoader): """加载`Git`存储库文件。 存储库可以是本地磁盘上的,位于`repo_path`处, 也可以是远程的,位于将被克隆到`repo_path`的`clone_url`。 目前,仅支持文本文件。 每个文档代表存储库中的一个文件。`path`指向 本地Git存储库,`branch`指定要加载文件的分支。 默认情况下,它从`main`分支加载。"""
[docs] def __init__( self, repo_path: str, clone_url: Optional[str] = None, branch: Optional[str] = "main", file_filter: Optional[Callable[[str], bool]] = None, ): """参数: repo_path:Git存储库的路径。 clone_url:可选。克隆存储库的URL。 branch:可选。要加载文件的分支。默认为“main”。 file_filter:可选。接受文件路径并返回一个布尔值,指示是否加载文件的函数。默认为None。 """ self.repo_path = repo_path self.clone_url = clone_url self.branch = branch self.file_filter = file_filter
[docs] def lazy_load(self) -> Iterator[Document]: try: from git import Blob, Repo except ImportError as ex: raise ImportError( "Could not import git python package. " "Please install it with `pip install GitPython`." ) from ex if not os.path.exists(self.repo_path) and self.clone_url is None: raise ValueError(f"Path {self.repo_path} does not exist") elif self.clone_url: # If the repo_path already contains a git repository, verify that it's the # same repository as the one we're trying to clone. if os.path.isdir(os.path.join(self.repo_path, ".git")): repo = Repo(self.repo_path) # If the existing repository is not the same as the one we're trying to # clone, raise an error. if repo.remotes.origin.url != self.clone_url: raise ValueError( "A different repository is already cloned at this path." ) else: repo = Repo.clone_from(self.clone_url, self.repo_path) repo.git.checkout(self.branch) else: repo = Repo(self.repo_path) repo.git.checkout(self.branch) for item in repo.tree().traverse(): if not isinstance(item, Blob): continue file_path = os.path.join(self.repo_path, item.path) ignored_files = repo.ignored([file_path]) if len(ignored_files): continue # uses filter to skip files if self.file_filter and not self.file_filter(file_path): continue rel_file_path = os.path.relpath(file_path, self.repo_path) try: with open(file_path, "rb") as f: content = f.read() file_type = os.path.splitext(item.name)[1] # loads only text files try: text_content = content.decode("utf-8") except UnicodeDecodeError: continue metadata = { "source": rel_file_path, "file_path": rel_file_path, "file_name": item.name, "file_type": file_type, } yield Document(page_content=text_content, metadata=metadata) except Exception as e: print(f"Error reading file {file_path}: {e}") # noqa: T201