Source code for langchain_community.document_loaders.airbyte_json

import json
from pathlib import Path
from typing import List, Union

from langchain_core.documents import Document
from langchain_core.utils import stringify_dict

from langchain_community.document_loaders.base import BaseLoader


[docs]class AirbyteJSONLoader(BaseLoader): """加载本地的`Airbyte` json文件。"""
[docs] def __init__(self, file_path: Union[str, Path]): """使用文件路径进行初始化。路径应以'/tmp/airbyte_local/'开头。""" self.file_path = file_path """Path to the directory containing the json files."""
[docs] def load(self) -> List[Document]: text = "" for line in open(self.file_path, "r"): data = json.loads(line)["_airbyte_data"] text += stringify_dict(data) metadata = {"source": str(self.file_path)} return [Document(page_content=text, metadata=metadata)]