Source code for langchain_community.document_loaders.parsers.docai

"""模块包含基于Google Cloud的Document AI的PDF解析器。

您需要安装两个库来使用此解析器:
pip install google-cloud-documentai
pip install google-cloud-documentai-toolbox
"""
import logging
import re
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING, Iterator, List, Optional, Sequence

from langchain_core._api.deprecation import deprecated
from langchain_core.documents import Document
from langchain_core.utils.iter import batch_iterate

from langchain_community.document_loaders.base import BaseBlobParser
from langchain_community.document_loaders.blob_loaders import Blob
from langchain_community.utilities.vertexai import get_client_info

if TYPE_CHECKING:
    from google.api_core.operation import Operation
    from google.cloud.documentai import DocumentProcessorServiceClient


logger = logging.getLogger(__name__)


[docs]@dataclass class DocAIParsingResults: """用于存储文档AI解析结果的数据类。""" source_path: str parsed_path: str
[docs]@deprecated( since="0.0.32", removal="0.3.0", alternative_import="langchain_google_community.DocAIParser", ) class DocAIParser(BaseBlobParser): """`Google Cloud Document AI` 解析器。 有关 Document AI 的详细说明,请参考产品文档。 https://cloud.google.com/document-ai/docs/overview"""
[docs] def __init__( self, *, client: Optional["DocumentProcessorServiceClient"] = None, location: Optional[str] = None, gcs_output_path: Optional[str] = None, processor_name: Optional[str] = None, ): """初始化解析器。 参数: client:要使用的DocumentProcessorServiceClient location:Document AI处理器所在的Google Cloud位置 gcs_output_path:用于存储解析结果的Google Cloud存储路径 processor_name:Document AI处理器或处理器版本的完整资源名称 您应该提供client或location(然后将实例化client)。 """ if bool(client) == bool(location): raise ValueError( "You must specify either a client or a location to instantiate " "a client." ) pattern = r"projects\/[0-9]+\/locations\/[a-z\-0-9]+\/processors\/[a-z0-9]+" if processor_name and not re.fullmatch(pattern, processor_name): raise ValueError( f"Processor name {processor_name} has the wrong format. If your " "prediction endpoint looks like https://us-documentai.googleapis.com" "/v1/projects/PROJECT_ID/locations/us/processors/PROCESSOR_ID:process," " use only projects/PROJECT_ID/locations/us/processors/PROCESSOR_ID " "part." ) self._gcs_output_path = gcs_output_path self._processor_name = processor_name if client: self._client = client else: try: from google.api_core.client_options import ClientOptions from google.cloud.documentai import DocumentProcessorServiceClient except ImportError as exc: raise ImportError( "documentai package not found, please install it with" " `pip install google-cloud-documentai`" ) from exc options = ClientOptions( api_endpoint=f"{location}-documentai.googleapis.com" ) self._client = DocumentProcessorServiceClient( client_options=options, client_info=get_client_info(module="document-ai"), )
[docs] def lazy_parse(self, blob: Blob) -> Iterator[Document]: """解析一个懒惰的blob。 参数: blobs: 要解析的Blob 这是一个长时间运行的操作。推荐的方法是将文档批量处理,并使用`batch_parse()`方法。 """ yield from self.batch_parse([blob], gcs_output_path=self._gcs_output_path)
[docs] def online_process( self, blob: Blob, enable_native_pdf_parsing: bool = True, field_mask: Optional[str] = None, page_range: Optional[List[int]] = None, ) -> Iterator[Document]: """解析一个blob,使用在线处理进行惰性解析。 参数: blob:要解析的blob。 enable_native_pdf_parsing:启用pdf嵌入文本提取。 field_mask:逗号分隔的要包含在Document AI响应中的字段列表。 建议:"text,pages.pageNumber,pages.layout" page_range:要解析的页面编号列表。如果为`None`, 将解析整个文档。 """ try: from google.cloud import documentai from google.cloud.documentai_v1.types import ( IndividualPageSelector, OcrConfig, ProcessOptions, ) except ImportError as exc: raise ImportError( "documentai package not found, please install it with" " `pip install google-cloud-documentai`" ) from exc try: from google.cloud.documentai_toolbox.wrappers.page import _text_from_layout except ImportError as exc: raise ImportError( "documentai_toolbox package not found, please install it with" " `pip install google-cloud-documentai-toolbox`" ) from exc ocr_config = ( OcrConfig(enable_native_pdf_parsing=enable_native_pdf_parsing) if enable_native_pdf_parsing else None ) individual_page_selector = ( IndividualPageSelector(pages=page_range) if page_range else None ) response = self._client.process_document( documentai.ProcessRequest( name=self._processor_name, gcs_document=documentai.GcsDocument( gcs_uri=blob.path, mime_type=blob.mimetype or "application/pdf", ), process_options=ProcessOptions( ocr_config=ocr_config, individual_page_selector=individual_page_selector, ), skip_human_review=True, field_mask=field_mask, ) ) yield from ( Document( page_content=_text_from_layout(page.layout, response.document.text), metadata={ "page": page.page_number, "source": blob.path, }, ) for page in response.document.pages )
[docs] def batch_parse( self, blobs: Sequence[Blob], gcs_output_path: Optional[str] = None, timeout_sec: int = 3600, check_in_interval_sec: int = 60, ) -> Iterator[Document]: """解析一个懒惰地列表的blob。 参数: blobs:要解析的blob列表。 gcs_output_path:用于存储解析结果的Google Cloud Storage路径。 timeout_sec:等待Document AI完成的超时时间,单位为秒。 check_in_interval_sec:等待下一次检查解析操作是否已完成的时间间隔,单位为秒。 这是一个长时间运行的操作。推荐的方法是将解析与创建LangChain文档分开: >>> operations = parser.docai_parse(blobs, gcs_path) >>> parser.is_running(operations) 您可以获取操作名称并保存它们: >>> names = [op.operation.name for op in operations] 当所有操作都完成时,您可以使用它们的结果: >>> operations = parser.operations_from_names(operation_names) >>> results = parser.get_results(operations) >>> docs = parser.parse_from_results(results) """ output_path = gcs_output_path or self._gcs_output_path if not output_path: raise ValueError( "An output path on Google Cloud Storage should be provided." ) operations = self.docai_parse(blobs, gcs_output_path=output_path) operation_names = [op.operation.name for op in operations] logger.debug( "Started parsing with Document AI, submitted operations %s", operation_names ) time_elapsed = 0 while self.is_running(operations): time.sleep(check_in_interval_sec) time_elapsed += check_in_interval_sec if time_elapsed > timeout_sec: raise TimeoutError( "Timeout exceeded! Check operations " f"{operation_names} later!" ) logger.debug(".") results = self.get_results(operations=operations) yield from self.parse_from_results(results)
[docs] def parse_from_results( self, results: List[DocAIParsingResults] ) -> Iterator[Document]: try: from google.cloud.documentai_toolbox.utilities.gcs_utilities import ( split_gcs_uri, ) from google.cloud.documentai_toolbox.wrappers.document import _get_shards from google.cloud.documentai_toolbox.wrappers.page import _text_from_layout except ImportError as exc: raise ImportError( "documentai_toolbox package not found, please install it with" " `pip install google-cloud-documentai-toolbox`" ) from exc for result in results: gcs_bucket_name, gcs_prefix = split_gcs_uri(result.parsed_path) shards = _get_shards(gcs_bucket_name, gcs_prefix) yield from ( Document( page_content=_text_from_layout(page.layout, shard.text), metadata={"page": page.page_number, "source": result.source_path}, ) for shard in shards for page in shard.pages )
[docs] def operations_from_names(self, operation_names: List[str]) -> List["Operation"]: """从名称初始化长时间运行的操作。""" try: from google.longrunning.operations_pb2 import ( GetOperationRequest, # type: ignore ) except ImportError as exc: raise ImportError( "long running operations package not found, please install it with" " `pip install gapic-google-longrunning`" ) from exc return [ self._client.get_operation(request=GetOperationRequest(name=name)) for name in operation_names ]
[docs] def is_running(self, operations: List["Operation"]) -> bool: return any(not op.done() for op in operations)
[docs] def docai_parse( self, blobs: Sequence[Blob], *, gcs_output_path: Optional[str] = None, processor_name: Optional[str] = None, batch_size: int = 1000, enable_native_pdf_parsing: bool = True, field_mask: Optional[str] = None, ) -> List["Operation"]: """在一组blob上运行Google Document AI PDF批处理。 参数: blobs:要解析的blob列表 gcs_output_path:用于存储结果的GCS路径(文件夹) processor_name:Document AI处理器的名称。 batch_size:每批文档的数量 enable_native_pdf_parsing:解析器的配置选项 field_mask:逗号分隔的要包含在Document AI响应中的字段列表。 建议:"text,pages.pageNumber,pages.layout" Document AI每批有1000个文件限制,因此超过该限制的批次需要拆分为多个请求。 批处理是异步长时间运行的操作,结果存储在输出GCS存储桶中。 """ try: from google.cloud import documentai from google.cloud.documentai_v1.types import OcrConfig, ProcessOptions except ImportError as exc: raise ImportError( "documentai package not found, please install it with" " `pip install google-cloud-documentai`" ) from exc output_path = gcs_output_path or self._gcs_output_path if output_path is None: raise ValueError( "An output path on Google Cloud Storage should be provided." ) processor_name = processor_name or self._processor_name if processor_name is None: raise ValueError("A Document AI processor name should be provided.") operations = [] for batch in batch_iterate(size=batch_size, iterable=blobs): input_config = documentai.BatchDocumentsInputConfig( gcs_documents=documentai.GcsDocuments( documents=[ documentai.GcsDocument( gcs_uri=blob.path, mime_type=blob.mimetype or "application/pdf", ) for blob in batch ] ) ) output_config = documentai.DocumentOutputConfig( gcs_output_config=documentai.DocumentOutputConfig.GcsOutputConfig( gcs_uri=output_path, field_mask=field_mask ) ) process_options = ( ProcessOptions( ocr_config=OcrConfig( enable_native_pdf_parsing=enable_native_pdf_parsing ) ) if enable_native_pdf_parsing else None ) operations.append( self._client.batch_process_documents( documentai.BatchProcessRequest( name=processor_name, input_documents=input_config, document_output_config=output_config, process_options=process_options, skip_human_review=True, ) ) ) return operations
[docs] def get_results(self, operations: List["Operation"]) -> List[DocAIParsingResults]: try: from google.cloud.documentai_v1 import BatchProcessMetadata except ImportError as exc: raise ImportError( "documentai package not found, please install it with" " `pip install google-cloud-documentai`" ) from exc return [ DocAIParsingResults( source_path=status.input_gcs_source, parsed_path=status.output_gcs_destination, ) for op in operations for status in ( op.metadata.individual_process_statuses if isinstance(op.metadata, BatchProcessMetadata) else BatchProcessMetadata.deserialize( op.metadata.value ).individual_process_statuses ) ]