Source code for langchain_community.document_loaders.airbyte

from typing import Any, Callable, Iterator, Mapping, Optional

from langchain_core.documents import Document
from langchain_core.utils.utils import guard_import

from langchain_community.document_loaders.base import BaseLoader

RecordHandler = Callable[[Any, Optional[str]], Document]


[docs]class AirbyteCDKLoader(BaseLoader): """使用CDK实现的Airbyte源连接器加载。"""
[docs] def __init__( self, config: Mapping[str, Any], source_class: Any, stream_name: str, record_handler: Optional[RecordHandler] = None, state: Optional[Any] = None, ) -> None: """初始化加载器。 参数: config: 传递给源连接器的配置。 source_class: 源连接器类。 stream_name: 要加载的流的名称。 record_handler: 一个接受记录和可选id的函数,并返回一个文档。如果为None,则记录将被用作文档。默认为None。 state: 传递给源连接器的状态。默认为None。 """ from airbyte_cdk.models.airbyte_protocol import AirbyteRecordMessage from airbyte_cdk.sources.embedded.base_integration import ( BaseEmbeddedIntegration, ) from airbyte_cdk.sources.embedded.runner import CDKRunner class CDKIntegration(BaseEmbeddedIntegration): """一个围绕CDK集成的包装器。""" def _handle_record( self, record: AirbyteRecordMessage, id: Optional[str] ) -> Document: if record_handler: return record_handler(record, id) return Document(page_content="", metadata=record.data) self._integration = CDKIntegration( config=config, runner=CDKRunner(source=source_class(), name=source_class.__name__), ) self._stream_name = stream_name self._state = state
[docs] def lazy_load(self) -> Iterator[Document]: return self._integration._load_data( stream_name=self._stream_name, state=self._state )
@property def last_state(self) -> Any: return self._integration.last_state
[docs]class AirbyteHubspotLoader(AirbyteCDKLoader): """使用`Airbyte`源连接器从`Hubspot`加载数据。"""
[docs] def __init__( self, config: Mapping[str, Any], stream_name: str, record_handler: Optional[RecordHandler] = None, state: Optional[Any] = None, ) -> None: """初始化加载器。 参数: config:传递给源连接器的配置。 stream_name:要加载的流的名称。 record_handler:接受记录和可选id并返回文档的函数。如果为None,则记录将用作文档。默认为None。 state:传递给源连接器的状态。默认为None。 """ source_class = guard_import( "source_hubspot", pip_name="airbyte-source-hubspot" ).SourceHubspot super().__init__( config=config, source_class=source_class, stream_name=stream_name, record_handler=record_handler, state=state, )
[docs]class AirbyteStripeLoader(AirbyteCDKLoader): """使用`Airbyte`源连接器从`Stripe`加载数据。"""
[docs] def __init__( self, config: Mapping[str, Any], stream_name: str, record_handler: Optional[RecordHandler] = None, state: Optional[Any] = None, ) -> None: """初始化加载器。 参数: config:传递给源连接器的配置。 stream_name:要加载的流的名称。 record_handler:接受记录和可选id并返回文档的函数。如果为None,则记录将用作文档。默认为None。 state:传递给源连接器的状态。默认为None。 """ source_class = guard_import( "source_stripe", pip_name="airbyte-source-stripe" ).SourceStripe super().__init__( config=config, source_class=source_class, stream_name=stream_name, record_handler=record_handler, state=state, )
[docs]class AirbyteTypeformLoader(AirbyteCDKLoader): """使用`Airbyte`源连接器从`Typeform`加载数据。"""
[docs] def __init__( self, config: Mapping[str, Any], stream_name: str, record_handler: Optional[RecordHandler] = None, state: Optional[Any] = None, ) -> None: """初始化加载器。 参数: config:传递给源连接器的配置。 stream_name:要加载的流的名称。 record_handler:接受记录和可选id并返回文档的函数。如果为None,则记录将用作文档。默认为None。 state:传递给源连接器的状态。默认为None。 """ source_class = guard_import( "source_typeform", pip_name="airbyte-source-typeform" ).SourceTypeform super().__init__( config=config, source_class=source_class, stream_name=stream_name, record_handler=record_handler, state=state, )
[docs]class AirbyteZendeskSupportLoader(AirbyteCDKLoader): """使用`Airbyte`源连接器从`Zendesk Support`加载数据。"""
[docs] def __init__( self, config: Mapping[str, Any], stream_name: str, record_handler: Optional[RecordHandler] = None, state: Optional[Any] = None, ) -> None: """初始化加载器。 参数: config:传递给源连接器的配置。 stream_name:要加载的流的名称。 record_handler:接受记录和可选id并返回文档的函数。如果为None,则记录将用作文档。默认为None。 state:传递给源连接器的状态。默认为None。 """ source_class = guard_import( "source_zendesk_support", pip_name="airbyte-source-zendesk-support" ).SourceZendeskSupport super().__init__( config=config, source_class=source_class, stream_name=stream_name, record_handler=record_handler, state=state, )
[docs]class AirbyteShopifyLoader(AirbyteCDKLoader): """使用`Airbyte`源连接器从`Shopify`加载数据。"""
[docs] def __init__( self, config: Mapping[str, Any], stream_name: str, record_handler: Optional[RecordHandler] = None, state: Optional[Any] = None, ) -> None: """初始化加载器。 参数: config:传递给源连接器的配置。 stream_name:要加载的流的名称。 record_handler:接受记录和可选id并返回文档的函数。如果为None,则记录将用作文档。默认为None。 state:传递给源连接器的状态。默认为None。 """ source_class = guard_import( "source_shopify", pip_name="airbyte-source-shopify" ).SourceShopify super().__init__( config=config, source_class=source_class, stream_name=stream_name, record_handler=record_handler, state=state, )
[docs]class AirbyteSalesforceLoader(AirbyteCDKLoader): """使用`Airbyte`源连接器从`Salesforce`加载数据。"""
[docs] def __init__( self, config: Mapping[str, Any], stream_name: str, record_handler: Optional[RecordHandler] = None, state: Optional[Any] = None, ) -> None: """初始化加载器。 参数: config:传递给源连接器的配置。 stream_name:要加载的流的名称。 record_handler:接受记录和可选id并返回文档的函数。如果为None,则记录将用作文档。默认为None。 state:传递给源连接器的状态。默认为None。 """ source_class = guard_import( "source_salesforce", pip_name="airbyte-source-salesforce" ).SourceSalesforce super().__init__( config=config, source_class=source_class, stream_name=stream_name, record_handler=record_handler, state=state, )
[docs]class AirbyteGongLoader(AirbyteCDKLoader): """使用`Airbyte`源连接器从`Gong`加载数据。"""
[docs] def __init__( self, config: Mapping[str, Any], stream_name: str, record_handler: Optional[RecordHandler] = None, state: Optional[Any] = None, ) -> None: """初始化加载器。 参数: config:传递给源连接器的配置。 stream_name:要加载的流的名称。 record_handler:接受记录和可选id并返回文档的函数。如果为None,则记录将用作文档。默认为None。 state:传递给源连接器的状态。默认为None。 """ source_class = guard_import( "source_gong", pip_name="airbyte-source-gong" ).SourceGong super().__init__( config=config, source_class=source_class, stream_name=stream_name, record_handler=record_handler, state=state, )