Source code for langchain_community.document_loaders.confluence

import logging
from enum import Enum
from io import BytesIO
from typing import Any, Callable, Dict, Iterator, List, Optional, Union

import requests
from langchain_core.documents import Document
from tenacity import (
    before_sleep_log,
    retry,
    stop_after_attempt,
    wait_exponential,
)

from langchain_community.document_loaders.base import BaseLoader

logger = logging.getLogger(__name__)


[docs]class ContentFormat(str, Enum): """Confluence页面内容格式的枚举器。""" EDITOR = "body.editor" EXPORT_VIEW = "body.export_view" ANONYMOUS_EXPORT_VIEW = "body.anonymous_export_view" STORAGE = "body.storage" VIEW = "body.view" def get_content(self, page: dict) -> str: return page["body"][self.name.lower()]["value"]
[docs]class ConfluenceLoader(BaseLoader): """加载`Confluence`页面。 https://llamahub.ai/l/confluence 的端口 目前支持用户名/api_key、Oauth2登录或个人访问令牌身份验证。 指定要加载的页面的列表page_ids和/或space_key到相应的Document对象中,如果两者都指定,则将返回两个集合的并集。 您还可以指定一个布尔值`include_attachments`来包括附件,默认设置为False,如果设置为True,则会下载所有附件,并且ConfluenceReader将从附件中提取文本并将其添加到Document对象中。目前支持的附件类型有:PDF、PNG、JPEG/JPG、SVG、Word和Excel。 Confluence API支持页面内容的不同格式。存储格式是用于存储的原始XML表示。视图格式是用于查看的HTML表示,其中宏被呈现为用户查看。您可以传递一个枚举`content_format`参数来指定内容格式,默认设置为`ContentFormat.STORAGE`,支持的值有:`ContentFormat.EDITOR`、`ContentFormat.EXPORT_VIEW`、`ContentFormat.ANONYMOUS_EXPORT_VIEW`、`ContentFormat.STORAGE`和`ContentFormat.VIEW`。 提示:space_key和page_id都可以在Confluence页面的URL中找到 - https://yoursite.atlassian.com/wiki/spaces/<space_key>/pages/<page_id> 示例: .. code-block:: python from langchain_community.document_loaders import ConfluenceLoader loader = ConfluenceLoader( url="https://yoursite.atlassian.com/wiki", username="me", api_key="12345", space_key="SPACE", limit=50, ) documents = loader.load() # Server on perm loader = ConfluenceLoader( url="https://confluence.yoursite.com/", username="me", api_key="your_password", cloud=False, space_key="SPACE", limit=50, ) documents = loader.load() :param url: _描述_ :type url: str :param api_key: _描述_,默认为None :type api_key: str, optional :param username: _描述_,默认为None :type username: str, optional :param oauth2: _描述_,默认为{} :type oauth2: dict, optional :param token: _描述_,默认为None :type token: str, optional :param cloud: _描述_,默认为True :type cloud: bool, optional :param number_of_retries: 重试次数,默认为3 :type number_of_retries: Optional[int], optional :param min_retry_seconds: 默认为2 :type min_retry_seconds: Optional[int], optional :param max_retry_seconds: 默认为10 :type max_retry_seconds: Optional[int], optional :param confluence_kwargs: 用于初始化Confluence的其他kwargs :type confluence_kwargs: dict, optional :param space_key: 从Confluence URL检索到的Space key,默认为None :type space_key: Optional[str], optional :param page_ids: 要加载的特定页面ID列表,默认为None :type page_ids: Optional[List[str]], optional :param label: 获取带有此标签的所有页面,默认为None :type label: Optional[str], optional :param cql: CQL表达式,默认为None :type cql: Optional[str], optional :param include_restricted_content: 默认为False :type include_restricted_content: bool, optional :param include_archived_content: 是否包括已存档内容,默认为False :type include_archived_content: bool, optional :param include_attachments: 默认为False :type include_attachments: bool, optional :param include_comments: 默认为False :type include_comments: bool, optional :param content_format: 指定内容格式,默认为ContentFormat.STORAGE,支持的值有: `ContentFormat.EDITOR`、`ContentFormat.EXPORT_VIEW`、 `ContentFormat.ANONYMOUS_EXPORT_VIEW`、 `ContentFormat.STORAGE`和`ContentFormat.VIEW`。 :type content_format: ContentFormat :param limit: 每个请求检索的最大页面数,默认为50 :type limit: int, optional :param max_pages: 总共要检索的最大页面数,默认为1000 :type max_pages: int, optional :param ocr_languages: 用于Tesseract代理的语言。要使用语言,首先需要安装适当的Tesseract语言包。 :type ocr_languages: str, optional :param keep_markdown_format: 是否保留markdown格式,默认为False :type keep_markdown_format: bool :param keep_newlines: 是否保留换行符格式,默认为False :type keep_newlines: bool :raises ValueError: 验证输入时出错 :raises ImportError: 未安装所需的依赖项。"""
[docs] def __init__( self, url: str, api_key: Optional[str] = None, username: Optional[str] = None, session: Optional[requests.Session] = None, oauth2: Optional[dict] = None, token: Optional[str] = None, cloud: Optional[bool] = True, number_of_retries: Optional[int] = 3, min_retry_seconds: Optional[int] = 2, max_retry_seconds: Optional[int] = 10, confluence_kwargs: Optional[dict] = None, *, space_key: Optional[str] = None, page_ids: Optional[List[str]] = None, label: Optional[str] = None, cql: Optional[str] = None, include_restricted_content: bool = False, include_archived_content: bool = False, include_attachments: bool = False, include_comments: bool = False, content_format: ContentFormat = ContentFormat.STORAGE, limit: Optional[int] = 50, max_pages: Optional[int] = 1000, ocr_languages: Optional[str] = None, keep_markdown_format: bool = False, keep_newlines: bool = False, ): self.space_key = space_key self.page_ids = page_ids self.label = label self.cql = cql self.include_restricted_content = include_restricted_content self.include_archived_content = include_archived_content self.include_attachments = include_attachments self.include_comments = include_comments self.content_format = content_format self.limit = limit self.max_pages = max_pages self.ocr_languages = ocr_languages self.keep_markdown_format = keep_markdown_format self.keep_newlines = keep_newlines confluence_kwargs = confluence_kwargs or {} errors = ConfluenceLoader.validate_init_args( url=url, api_key=api_key, username=username, session=session, oauth2=oauth2, token=token, ) if errors: raise ValueError(f"Error(s) while validating input: {errors}") try: from atlassian import Confluence except ImportError: raise ImportError( "`atlassian` package not found, please run " "`pip install atlassian-python-api`" ) self.base_url = url self.number_of_retries = number_of_retries self.min_retry_seconds = min_retry_seconds self.max_retry_seconds = max_retry_seconds if session: self.confluence = Confluence(url=url, session=session, **confluence_kwargs) elif oauth2: self.confluence = Confluence( url=url, oauth2=oauth2, cloud=cloud, **confluence_kwargs ) elif token: self.confluence = Confluence( url=url, token=token, cloud=cloud, **confluence_kwargs ) else: self.confluence = Confluence( url=url, username=username, password=api_key, cloud=cloud, **confluence_kwargs, )
[docs] @staticmethod def validate_init_args( url: Optional[str] = None, api_key: Optional[str] = None, username: Optional[str] = None, session: Optional[requests.Session] = None, oauth2: Optional[dict] = None, token: Optional[str] = None, ) -> Union[List, None]: """验证init参数的正确组合""" errors = [] if url is None: errors.append("Must provide `base_url`") if (api_key and not username) or (username and not api_key): errors.append( "If one of `api_key` or `username` is provided, " "the other must be as well." ) non_null_creds = list( x is not None for x in ((api_key or username), session, oauth2, token) ) if sum(non_null_creds) > 1: all_names = ("(api_key, username)", "session", "oath2", "token") provided = tuple(n for x, n in zip(non_null_creds, all_names) if x) errors.append( f"Cannot provide a value for more than one of: {all_names}. Received " f"values for: {provided}" ) if oauth2 and set(oauth2.keys()) != { "access_token", "access_token_secret", "consumer_key", "key_cert", }: errors.append( "You have either omitted require keys or added extra " "keys to the oauth2 dictionary. key values should be " "`['access_token', 'access_token_secret', 'consumer_key', 'key_cert']`" ) return errors or None
def _resolve_param(self, param_name: str, kwargs: Any) -> Any: return kwargs[param_name] if param_name in kwargs else getattr(self, param_name) def _lazy_load(self, **kwargs: Any) -> Iterator[Document]: if kwargs: logger.warning( f"Received runtime arguments {kwargs}. Passing runtime args to `load`" f" is deprecated. Please pass arguments during initialization instead." ) space_key = self._resolve_param("space_key", kwargs) page_ids = self._resolve_param("page_ids", kwargs) label = self._resolve_param("label", kwargs) cql = self._resolve_param("cql", kwargs) include_restricted_content = self._resolve_param( "include_restricted_content", kwargs ) include_archived_content = self._resolve_param( "include_archived_content", kwargs ) include_attachments = self._resolve_param("include_attachments", kwargs) include_comments = self._resolve_param("include_comments", kwargs) content_format = self._resolve_param("content_format", kwargs) limit = self._resolve_param("limit", kwargs) max_pages = self._resolve_param("max_pages", kwargs) ocr_languages = self._resolve_param("ocr_languages", kwargs) keep_markdown_format = self._resolve_param("keep_markdown_format", kwargs) keep_newlines = self._resolve_param("keep_newlines", kwargs) if not space_key and not page_ids and not label and not cql: raise ValueError( "Must specify at least one among `space_key`, `page_ids`, " "`label`, `cql` parameters." ) if space_key: pages = self.paginate_request( self.confluence.get_all_pages_from_space, space=space_key, limit=limit, max_pages=max_pages, status="any" if include_archived_content else "current", expand=f"{content_format.value},version", ) yield from self.process_pages( pages, include_restricted_content, include_attachments, include_comments, content_format, ocr_languages=ocr_languages, keep_markdown_format=keep_markdown_format, keep_newlines=keep_newlines, ) if label: pages = self.paginate_request( self.confluence.get_all_pages_by_label, label=label, limit=limit, max_pages=max_pages, ) ids_by_label = [page["id"] for page in pages] if page_ids: page_ids = list(set(page_ids + ids_by_label)) else: page_ids = list(set(ids_by_label)) if cql: pages = self.paginate_request( self._search_content_by_cql, cql=cql, limit=limit, max_pages=max_pages, include_archived_spaces=include_archived_content, expand=f"{content_format.value},version", ) yield from self.process_pages( pages, include_restricted_content, include_attachments, include_comments, content_format, ocr_languages, keep_markdown_format, ) if page_ids: for page_id in page_ids: get_page = retry( reraise=True, stop=stop_after_attempt( self.number_of_retries # type: ignore[arg-type] ), wait=wait_exponential( multiplier=1, # type: ignore[arg-type] min=self.min_retry_seconds, # type: ignore[arg-type] max=self.max_retry_seconds, # type: ignore[arg-type] ), before_sleep=before_sleep_log(logger, logging.WARNING), )(self.confluence.get_page_by_id) page = get_page( page_id=page_id, expand=f"{content_format.value},version" ) if not include_restricted_content and not self.is_public_page(page): continue yield self.process_page( page, include_attachments, include_comments, content_format, ocr_languages, keep_markdown_format, )
[docs] def load(self, **kwargs: Any) -> List[Document]: return list(self._lazy_load(**kwargs))
[docs] def lazy_load(self) -> Iterator[Document]: yield from self._lazy_load()
def _search_content_by_cql( self, cql: str, include_archived_spaces: Optional[bool] = None, **kwargs: Any ) -> List[dict]: url = "rest/api/content/search" params: Dict[str, Any] = {"cql": cql} params.update(kwargs) if include_archived_spaces is not None: params["includeArchivedSpaces"] = include_archived_spaces response = self.confluence.get(url, params=params) return response.get("results", [])
[docs] def paginate_request(self, retrieval_method: Callable, **kwargs: Any) -> List: """对各种检索页面的方法进行分页。 不幸的是,由于页面大小的限制,有时Confluence API的限制值不匹配。如果`limit`大于100,Confluence似乎会将响应限制为100。此外,由于Atlassian Python包的原因,我们无法从“_links”键中获取“next”值,因为它们只从结果键返回值。因此,在这里,分页从0开始,直到max_pages,每次请求获取`limit`数量的页面。我们必须手动检查返回的页面列表的长度是否有更多文档,而不仅仅是检查响应中是否存在`next`键,就像这个页面所建议的那样: https://developer.atlassian.com/server/confluence/pagination-in-the-rest-api/ :param retrieval_method: 用于检索文档的函数 :type retrieval_method: callable :return: 文档列表 :rtype: List """ max_pages = kwargs.pop("max_pages") docs: List[dict] = [] while len(docs) < max_pages: get_pages = retry( reraise=True, stop=stop_after_attempt( self.number_of_retries # type: ignore[arg-type] ), wait=wait_exponential( multiplier=1, min=self.min_retry_seconds, # type: ignore[arg-type] max=self.max_retry_seconds, # type: ignore[arg-type] ), before_sleep=before_sleep_log(logger, logging.WARNING), )(retrieval_method) batch = get_pages(**kwargs, start=len(docs)) if not batch: break docs.extend(batch) return docs[:max_pages]
[docs] def is_public_page(self, page: dict) -> bool: """检查页面是否可以公开访问。""" restrictions = self.confluence.get_all_restrictions_for_content(page["id"]) return ( page["status"] == "current" and not restrictions["read"]["restrictions"]["user"]["results"] and not restrictions["read"]["restrictions"]["group"]["results"] )
[docs] def process_pages( self, pages: List[dict], include_restricted_content: bool, include_attachments: bool, include_comments: bool, content_format: ContentFormat, ocr_languages: Optional[str] = None, keep_markdown_format: Optional[bool] = False, keep_newlines: bool = False, ) -> Iterator[Document]: """将页面列表处理成文档列表。""" for page in pages: if not include_restricted_content and not self.is_public_page(page): continue yield self.process_page( page, include_attachments, include_comments, content_format, ocr_languages=ocr_languages, keep_markdown_format=keep_markdown_format, keep_newlines=keep_newlines, )
[docs] def process_page( self, page: dict, include_attachments: bool, include_comments: bool, content_format: ContentFormat, ocr_languages: Optional[str] = None, keep_markdown_format: Optional[bool] = False, keep_newlines: bool = False, ) -> Document: if keep_markdown_format: try: from markdownify import markdownify except ImportError: raise ImportError( "`markdownify` package not found, please run " "`pip install markdownify`" ) if include_comments or not keep_markdown_format: try: from bs4 import BeautifulSoup except ImportError: raise ImportError( "`beautifulsoup4` package not found, please run " "`pip install beautifulsoup4`" ) if include_attachments: attachment_texts = self.process_attachment(page["id"], ocr_languages) else: attachment_texts = [] content = content_format.get_content(page) if keep_markdown_format: # Use markdownify to keep the page Markdown style text = markdownify(content, heading_style="ATX") + "".join(attachment_texts) else: if keep_newlines: text = BeautifulSoup( content.replace("</p>", "\n</p>").replace("<br />", "\n"), "lxml" ).get_text(" ") + "".join(attachment_texts) else: text = BeautifulSoup(content, "lxml").get_text( " ", strip=True ) + "".join(attachment_texts) if include_comments: comments = self.confluence.get_page_comments( page["id"], expand="body.view.value", depth="all" )["results"] comment_texts = [ BeautifulSoup(comment["body"]["view"]["value"], "lxml").get_text( " ", strip=True ) for comment in comments ] text = text + "".join(comment_texts) metadata = { "title": page["title"], "id": page["id"], "source": self.base_url.strip("/") + page["_links"]["webui"], } if "version" in page and "when" in page["version"]: metadata["when"] = page["version"]["when"] return Document( page_content=text, metadata=metadata, )
[docs] def process_attachment( self, page_id: str, ocr_languages: Optional[str] = None, ) -> List[str]: try: from PIL import Image # noqa: F401 except ImportError: raise ImportError( "`Pillow` package not found, " "please run `pip install Pillow`" ) # depending on setup you may also need to set the correct path for # poppler and tesseract attachments = self.confluence.get_attachments_from_content(page_id)["results"] texts = [] for attachment in attachments: media_type = attachment["metadata"]["mediaType"] absolute_url = self.base_url + attachment["_links"]["download"] title = attachment["title"] try: if media_type == "application/pdf": text = title + self.process_pdf(absolute_url, ocr_languages) elif ( media_type == "image/png" or media_type == "image/jpg" or media_type == "image/jpeg" ): text = title + self.process_image(absolute_url, ocr_languages) elif ( media_type == "application/vnd.openxmlformats-officedocument" ".wordprocessingml.document" ): text = title + self.process_doc(absolute_url) elif media_type == "application/vnd.ms-excel": text = title + self.process_xls(absolute_url) elif media_type == "image/svg+xml": text = title + self.process_svg(absolute_url, ocr_languages) else: continue texts.append(text) except requests.HTTPError as e: if e.response.status_code == 404: print(f"Attachment not found at {absolute_url}") # noqa: T201 continue else: raise return texts
[docs] def process_pdf( self, link: str, ocr_languages: Optional[str] = None, ) -> str: try: import pytesseract from pdf2image import convert_from_bytes except ImportError: raise ImportError( "`pytesseract` or `pdf2image` package not found, " "please run `pip install pytesseract pdf2image`" ) response = self.confluence.request(path=link, absolute=True) text = "" if ( response.status_code != 200 or response.content == b"" or response.content is None ): return text try: images = convert_from_bytes(response.content) except ValueError: return text for i, image in enumerate(images): image_text = pytesseract.image_to_string(image, lang=ocr_languages) text += f"Page {i + 1}:\n{image_text}\n\n" return text
[docs] def process_image( self, link: str, ocr_languages: Optional[str] = None, ) -> str: try: import pytesseract from PIL import Image except ImportError: raise ImportError( "`pytesseract` or `Pillow` package not found, " "please run `pip install pytesseract Pillow`" ) response = self.confluence.request(path=link, absolute=True) text = "" if ( response.status_code != 200 or response.content == b"" or response.content is None ): return text try: image = Image.open(BytesIO(response.content)) except OSError: return text return pytesseract.image_to_string(image, lang=ocr_languages)
[docs] def process_doc(self, link: str) -> str: try: import docx2txt except ImportError: raise ImportError( "`docx2txt` package not found, please run `pip install docx2txt`" ) response = self.confluence.request(path=link, absolute=True) text = "" if ( response.status_code != 200 or response.content == b"" or response.content is None ): return text file_data = BytesIO(response.content) return docx2txt.process(file_data)
[docs] def process_xls(self, link: str) -> str: import io import os try: import xlrd except ImportError: raise ImportError("`xlrd` package not found, please run `pip install xlrd`") try: import pandas as pd except ImportError: raise ImportError( "`pandas` package not found, please run `pip install pandas`" ) response = self.confluence.request(path=link, absolute=True) text = "" if ( response.status_code != 200 or response.content == b"" or response.content is None ): return text filename = os.path.basename(link) # Getting the whole content of the url after filename, # Example: ".csv?version=2&modificationDate=1631800010678&cacheVersion=1&api=v2" file_extension = os.path.splitext(filename)[1] if file_extension.startswith( ".csv" ): # if the extension found in the url is ".csv" content_string = response.content.decode("utf-8") df = pd.read_csv(io.StringIO(content_string)) text += df.to_string(index=False, header=False) + "\n\n" else: workbook = xlrd.open_workbook(file_contents=response.content) for sheet in workbook.sheets(): text += f"{sheet.name}:\n" for row in range(sheet.nrows): for col in range(sheet.ncols): text += f"{sheet.cell_value(row, col)}\t" text += "\n" text += "\n" return text
[docs] def process_svg( self, link: str, ocr_languages: Optional[str] = None, ) -> str: try: import pytesseract from PIL import Image from reportlab.graphics import renderPM from svglib.svglib import svg2rlg except ImportError: raise ImportError( "`pytesseract`, `Pillow`, `reportlab` or `svglib` package not found, " "please run `pip install pytesseract Pillow reportlab svglib`" ) response = self.confluence.request(path=link, absolute=True) text = "" if ( response.status_code != 200 or response.content == b"" or response.content is None ): return text drawing = svg2rlg(BytesIO(response.content)) img_data = BytesIO() renderPM.drawToFile(drawing, img_data, fmt="PNG") img_data.seek(0) image = Image.open(img_data) return pytesseract.image_to_string(image, lang=ocr_languages)