Source code for langchain_community.document_loaders.git
import os
from typing import Callable, Iterator, Optional
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
[docs]class GitLoader(BaseLoader):
"""加载`Git`存储库文件。
存储库可以是本地磁盘上的,位于`repo_path`处,
也可以是远程的,位于将被克隆到`repo_path`的`clone_url`。
目前,仅支持文本文件。
每个文档代表存储库中的一个文件。`path`指向
本地Git存储库,`branch`指定要加载文件的分支。
默认情况下,它从`main`分支加载。"""
[docs] def __init__(
self,
repo_path: str,
clone_url: Optional[str] = None,
branch: Optional[str] = "main",
file_filter: Optional[Callable[[str], bool]] = None,
):
"""参数:
repo_path:Git存储库的路径。
clone_url:可选。克隆存储库的URL。
branch:可选。要加载文件的分支。默认为“main”。
file_filter:可选。接受文件路径并返回一个布尔值,指示是否加载文件的函数。默认为None。
"""
self.repo_path = repo_path
self.clone_url = clone_url
self.branch = branch
self.file_filter = file_filter
[docs] def lazy_load(self) -> Iterator[Document]:
try:
from git import Blob, Repo
except ImportError as ex:
raise ImportError(
"Could not import git python package. "
"Please install it with `pip install GitPython`."
) from ex
if not os.path.exists(self.repo_path) and self.clone_url is None:
raise ValueError(f"Path {self.repo_path} does not exist")
elif self.clone_url:
# If the repo_path already contains a git repository, verify that it's the
# same repository as the one we're trying to clone.
if os.path.isdir(os.path.join(self.repo_path, ".git")):
repo = Repo(self.repo_path)
# If the existing repository is not the same as the one we're trying to
# clone, raise an error.
if repo.remotes.origin.url != self.clone_url:
raise ValueError(
"A different repository is already cloned at this path."
)
else:
repo = Repo.clone_from(self.clone_url, self.repo_path)
repo.git.checkout(self.branch)
else:
repo = Repo(self.repo_path)
repo.git.checkout(self.branch)
for item in repo.tree().traverse():
if not isinstance(item, Blob):
continue
file_path = os.path.join(self.repo_path, item.path)
ignored_files = repo.ignored([file_path])
if len(ignored_files):
continue
# uses filter to skip files
if self.file_filter and not self.file_filter(file_path):
continue
rel_file_path = os.path.relpath(file_path, self.repo_path)
try:
with open(file_path, "rb") as f:
content = f.read()
file_type = os.path.splitext(item.name)[1]
# loads only text files
try:
text_content = content.decode("utf-8")
except UnicodeDecodeError:
continue
metadata = {
"source": rel_file_path,
"file_path": rel_file_path,
"file_name": item.name,
"file_type": file_type,
}
yield Document(page_content=text_content, metadata=metadata)
except Exception as e:
print(f"Error reading file {file_path}: {e}") # noqa: T201