"""模块包含基于Google Cloud的Document AI的PDF解析器。
您需要安装两个库来使用此解析器:
pip install google-cloud-documentai
pip install google-cloud-documentai-toolbox
"""
import logging
import re
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING, Iterator, List, Optional, Sequence
from langchain_core._api.deprecation import deprecated
from langchain_core.documents import Document
from langchain_core.utils.iter import batch_iterate
from langchain_community.document_loaders.base import BaseBlobParser
from langchain_community.document_loaders.blob_loaders import Blob
from langchain_community.utilities.vertexai import get_client_info
if TYPE_CHECKING:
from google.api_core.operation import Operation
from google.cloud.documentai import DocumentProcessorServiceClient
logger = logging.getLogger(__name__)
[docs]@dataclass
class DocAIParsingResults:
"""用于存储文档AI解析结果的数据类。"""
source_path: str
parsed_path: str
[docs]@deprecated(
since="0.0.32",
removal="0.3.0",
alternative_import="langchain_google_community.DocAIParser",
)
class DocAIParser(BaseBlobParser):
"""`Google Cloud Document AI` 解析器。
有关 Document AI 的详细说明,请参考产品文档。
https://cloud.google.com/document-ai/docs/overview"""
[docs] def __init__(
self,
*,
client: Optional["DocumentProcessorServiceClient"] = None,
location: Optional[str] = None,
gcs_output_path: Optional[str] = None,
processor_name: Optional[str] = None,
):
"""初始化解析器。
参数:
client:要使用的DocumentProcessorServiceClient
location:Document AI处理器所在的Google Cloud位置
gcs_output_path:用于存储解析结果的Google Cloud存储路径
processor_name:Document AI处理器或处理器版本的完整资源名称
您应该提供client或location(然后将实例化client)。
"""
if bool(client) == bool(location):
raise ValueError(
"You must specify either a client or a location to instantiate "
"a client."
)
pattern = r"projects\/[0-9]+\/locations\/[a-z\-0-9]+\/processors\/[a-z0-9]+"
if processor_name and not re.fullmatch(pattern, processor_name):
raise ValueError(
f"Processor name {processor_name} has the wrong format. If your "
"prediction endpoint looks like https://us-documentai.googleapis.com"
"/v1/projects/PROJECT_ID/locations/us/processors/PROCESSOR_ID:process,"
" use only projects/PROJECT_ID/locations/us/processors/PROCESSOR_ID "
"part."
)
self._gcs_output_path = gcs_output_path
self._processor_name = processor_name
if client:
self._client = client
else:
try:
from google.api_core.client_options import ClientOptions
from google.cloud.documentai import DocumentProcessorServiceClient
except ImportError as exc:
raise ImportError(
"documentai package not found, please install it with"
" `pip install google-cloud-documentai`"
) from exc
options = ClientOptions(
api_endpoint=f"{location}-documentai.googleapis.com"
)
self._client = DocumentProcessorServiceClient(
client_options=options,
client_info=get_client_info(module="document-ai"),
)
[docs] def lazy_parse(self, blob: Blob) -> Iterator[Document]:
"""解析一个懒惰的blob。
参数:
blobs: 要解析的Blob
这是一个长时间运行的操作。推荐的方法是将文档批量处理,并使用`batch_parse()`方法。
"""
yield from self.batch_parse([blob], gcs_output_path=self._gcs_output_path)
[docs] def online_process(
self,
blob: Blob,
enable_native_pdf_parsing: bool = True,
field_mask: Optional[str] = None,
page_range: Optional[List[int]] = None,
) -> Iterator[Document]:
"""解析一个blob,使用在线处理进行惰性解析。
参数:
blob:要解析的blob。
enable_native_pdf_parsing:启用pdf嵌入文本提取。
field_mask:逗号分隔的要包含在Document AI响应中的字段列表。
建议:"text,pages.pageNumber,pages.layout"
page_range:要解析的页面编号列表。如果为`None`,
将解析整个文档。
"""
try:
from google.cloud import documentai
from google.cloud.documentai_v1.types import (
IndividualPageSelector,
OcrConfig,
ProcessOptions,
)
except ImportError as exc:
raise ImportError(
"documentai package not found, please install it with"
" `pip install google-cloud-documentai`"
) from exc
try:
from google.cloud.documentai_toolbox.wrappers.page import _text_from_layout
except ImportError as exc:
raise ImportError(
"documentai_toolbox package not found, please install it with"
" `pip install google-cloud-documentai-toolbox`"
) from exc
ocr_config = (
OcrConfig(enable_native_pdf_parsing=enable_native_pdf_parsing)
if enable_native_pdf_parsing
else None
)
individual_page_selector = (
IndividualPageSelector(pages=page_range) if page_range else None
)
response = self._client.process_document(
documentai.ProcessRequest(
name=self._processor_name,
gcs_document=documentai.GcsDocument(
gcs_uri=blob.path,
mime_type=blob.mimetype or "application/pdf",
),
process_options=ProcessOptions(
ocr_config=ocr_config,
individual_page_selector=individual_page_selector,
),
skip_human_review=True,
field_mask=field_mask,
)
)
yield from (
Document(
page_content=_text_from_layout(page.layout, response.document.text),
metadata={
"page": page.page_number,
"source": blob.path,
},
)
for page in response.document.pages
)
[docs] def batch_parse(
self,
blobs: Sequence[Blob],
gcs_output_path: Optional[str] = None,
timeout_sec: int = 3600,
check_in_interval_sec: int = 60,
) -> Iterator[Document]:
"""解析一个懒惰地列表的blob。
参数:
blobs:要解析的blob列表。
gcs_output_path:用于存储解析结果的Google Cloud Storage路径。
timeout_sec:等待Document AI完成的超时时间,单位为秒。
check_in_interval_sec:等待下一次检查解析操作是否已完成的时间间隔,单位为秒。
这是一个长时间运行的操作。推荐的方法是将解析与创建LangChain文档分开:
>>> operations = parser.docai_parse(blobs, gcs_path)
>>> parser.is_running(operations)
您可以获取操作名称并保存它们:
>>> names = [op.operation.name for op in operations]
当所有操作都完成时,您可以使用它们的结果:
>>> operations = parser.operations_from_names(operation_names)
>>> results = parser.get_results(operations)
>>> docs = parser.parse_from_results(results)
"""
output_path = gcs_output_path or self._gcs_output_path
if not output_path:
raise ValueError(
"An output path on Google Cloud Storage should be provided."
)
operations = self.docai_parse(blobs, gcs_output_path=output_path)
operation_names = [op.operation.name for op in operations]
logger.debug(
"Started parsing with Document AI, submitted operations %s", operation_names
)
time_elapsed = 0
while self.is_running(operations):
time.sleep(check_in_interval_sec)
time_elapsed += check_in_interval_sec
if time_elapsed > timeout_sec:
raise TimeoutError(
"Timeout exceeded! Check operations " f"{operation_names} later!"
)
logger.debug(".")
results = self.get_results(operations=operations)
yield from self.parse_from_results(results)
[docs] def parse_from_results(
self, results: List[DocAIParsingResults]
) -> Iterator[Document]:
try:
from google.cloud.documentai_toolbox.utilities.gcs_utilities import (
split_gcs_uri,
)
from google.cloud.documentai_toolbox.wrappers.document import _get_shards
from google.cloud.documentai_toolbox.wrappers.page import _text_from_layout
except ImportError as exc:
raise ImportError(
"documentai_toolbox package not found, please install it with"
" `pip install google-cloud-documentai-toolbox`"
) from exc
for result in results:
gcs_bucket_name, gcs_prefix = split_gcs_uri(result.parsed_path)
shards = _get_shards(gcs_bucket_name, gcs_prefix)
yield from (
Document(
page_content=_text_from_layout(page.layout, shard.text),
metadata={"page": page.page_number, "source": result.source_path},
)
for shard in shards
for page in shard.pages
)
[docs] def operations_from_names(self, operation_names: List[str]) -> List["Operation"]:
"""从名称初始化长时间运行的操作。"""
try:
from google.longrunning.operations_pb2 import (
GetOperationRequest, # type: ignore
)
except ImportError as exc:
raise ImportError(
"long running operations package not found, please install it with"
" `pip install gapic-google-longrunning`"
) from exc
return [
self._client.get_operation(request=GetOperationRequest(name=name))
for name in operation_names
]
[docs] def is_running(self, operations: List["Operation"]) -> bool:
return any(not op.done() for op in operations)
[docs] def docai_parse(
self,
blobs: Sequence[Blob],
*,
gcs_output_path: Optional[str] = None,
processor_name: Optional[str] = None,
batch_size: int = 1000,
enable_native_pdf_parsing: bool = True,
field_mask: Optional[str] = None,
) -> List["Operation"]:
"""在一组blob上运行Google Document AI PDF批处理。
参数:
blobs:要解析的blob列表
gcs_output_path:用于存储结果的GCS路径(文件夹)
processor_name:Document AI处理器的名称。
batch_size:每批文档的数量
enable_native_pdf_parsing:解析器的配置选项
field_mask:逗号分隔的要包含在Document AI响应中的字段列表。
建议:"text,pages.pageNumber,pages.layout"
Document AI每批有1000个文件限制,因此超过该限制的批次需要拆分为多个请求。
批处理是异步长时间运行的操作,结果存储在输出GCS存储桶中。
"""
try:
from google.cloud import documentai
from google.cloud.documentai_v1.types import OcrConfig, ProcessOptions
except ImportError as exc:
raise ImportError(
"documentai package not found, please install it with"
" `pip install google-cloud-documentai`"
) from exc
output_path = gcs_output_path or self._gcs_output_path
if output_path is None:
raise ValueError(
"An output path on Google Cloud Storage should be provided."
)
processor_name = processor_name or self._processor_name
if processor_name is None:
raise ValueError("A Document AI processor name should be provided.")
operations = []
for batch in batch_iterate(size=batch_size, iterable=blobs):
input_config = documentai.BatchDocumentsInputConfig(
gcs_documents=documentai.GcsDocuments(
documents=[
documentai.GcsDocument(
gcs_uri=blob.path,
mime_type=blob.mimetype or "application/pdf",
)
for blob in batch
]
)
)
output_config = documentai.DocumentOutputConfig(
gcs_output_config=documentai.DocumentOutputConfig.GcsOutputConfig(
gcs_uri=output_path, field_mask=field_mask
)
)
process_options = (
ProcessOptions(
ocr_config=OcrConfig(
enable_native_pdf_parsing=enable_native_pdf_parsing
)
)
if enable_native_pdf_parsing
else None
)
operations.append(
self._client.batch_process_documents(
documentai.BatchProcessRequest(
name=processor_name,
input_documents=input_config,
document_output_config=output_config,
process_options=process_options,
skip_human_review=True,
)
)
)
return operations
[docs] def get_results(self, operations: List["Operation"]) -> List[DocAIParsingResults]:
try:
from google.cloud.documentai_v1 import BatchProcessMetadata
except ImportError as exc:
raise ImportError(
"documentai package not found, please install it with"
" `pip install google-cloud-documentai`"
) from exc
return [
DocAIParsingResults(
source_path=status.input_gcs_source,
parsed_path=status.output_gcs_destination,
)
for op in operations
for status in (
op.metadata.individual_process_statuses
if isinstance(op.metadata, BatchProcessMetadata)
else BatchProcessMetadata.deserialize(
op.metadata.value
).individual_process_statuses
)
]