import logging
from enum import Enum
from io import BytesIO
from typing import Any, Callable, Dict, Iterator, List, Optional, Union
import requests
from langchain_core.documents import Document
from tenacity import (
before_sleep_log,
retry,
stop_after_attempt,
wait_exponential,
)
from langchain_community.document_loaders.base import BaseLoader
logger = logging.getLogger(__name__)
[docs]class ContentFormat(str, Enum):
"""Confluence页面内容格式的枚举器。"""
EDITOR = "body.editor"
EXPORT_VIEW = "body.export_view"
ANONYMOUS_EXPORT_VIEW = "body.anonymous_export_view"
STORAGE = "body.storage"
VIEW = "body.view"
def get_content(self, page: dict) -> str:
return page["body"][self.name.lower()]["value"]
[docs]class ConfluenceLoader(BaseLoader):
"""加载`Confluence`页面。
https://llamahub.ai/l/confluence 的端口
目前支持用户名/api_key、Oauth2登录或个人访问令牌身份验证。
指定要加载的页面的列表page_ids和/或space_key到相应的Document对象中,如果两者都指定,则将返回两个集合的并集。
您还可以指定一个布尔值`include_attachments`来包括附件,默认设置为False,如果设置为True,则会下载所有附件,并且ConfluenceReader将从附件中提取文本并将其添加到Document对象中。目前支持的附件类型有:PDF、PNG、JPEG/JPG、SVG、Word和Excel。
Confluence API支持页面内容的不同格式。存储格式是用于存储的原始XML表示。视图格式是用于查看的HTML表示,其中宏被呈现为用户查看。您可以传递一个枚举`content_format`参数来指定内容格式,默认设置为`ContentFormat.STORAGE`,支持的值有:`ContentFormat.EDITOR`、`ContentFormat.EXPORT_VIEW`、`ContentFormat.ANONYMOUS_EXPORT_VIEW`、`ContentFormat.STORAGE`和`ContentFormat.VIEW`。
提示:space_key和page_id都可以在Confluence页面的URL中找到
- https://yoursite.atlassian.com/wiki/spaces/<space_key>/pages/<page_id>
示例:
.. code-block:: python
from langchain_community.document_loaders import ConfluenceLoader
loader = ConfluenceLoader(
url="https://yoursite.atlassian.com/wiki",
username="me",
api_key="12345",
space_key="SPACE",
limit=50,
)
documents = loader.load()
# Server on perm
loader = ConfluenceLoader(
url="https://confluence.yoursite.com/",
username="me",
api_key="your_password",
cloud=False,
space_key="SPACE",
limit=50,
)
documents = loader.load()
:param url: _描述_
:type url: str
:param api_key: _描述_,默认为None
:type api_key: str, optional
:param username: _描述_,默认为None
:type username: str, optional
:param oauth2: _描述_,默认为{}
:type oauth2: dict, optional
:param token: _描述_,默认为None
:type token: str, optional
:param cloud: _描述_,默认为True
:type cloud: bool, optional
:param number_of_retries: 重试次数,默认为3
:type number_of_retries: Optional[int], optional
:param min_retry_seconds: 默认为2
:type min_retry_seconds: Optional[int], optional
:param max_retry_seconds: 默认为10
:type max_retry_seconds: Optional[int], optional
:param confluence_kwargs: 用于初始化Confluence的其他kwargs
:type confluence_kwargs: dict, optional
:param space_key: 从Confluence URL检索到的Space key,默认为None
:type space_key: Optional[str], optional
:param page_ids: 要加载的特定页面ID列表,默认为None
:type page_ids: Optional[List[str]], optional
:param label: 获取带有此标签的所有页面,默认为None
:type label: Optional[str], optional
:param cql: CQL表达式,默认为None
:type cql: Optional[str], optional
:param include_restricted_content: 默认为False
:type include_restricted_content: bool, optional
:param include_archived_content: 是否包括已存档内容,默认为False
:type include_archived_content: bool, optional
:param include_attachments: 默认为False
:type include_attachments: bool, optional
:param include_comments: 默认为False
:type include_comments: bool, optional
:param content_format: 指定内容格式,默认为ContentFormat.STORAGE,支持的值有:
`ContentFormat.EDITOR`、`ContentFormat.EXPORT_VIEW`、
`ContentFormat.ANONYMOUS_EXPORT_VIEW`、
`ContentFormat.STORAGE`和`ContentFormat.VIEW`。
:type content_format: ContentFormat
:param limit: 每个请求检索的最大页面数,默认为50
:type limit: int, optional
:param max_pages: 总共要检索的最大页面数,默认为1000
:type max_pages: int, optional
:param ocr_languages: 用于Tesseract代理的语言。要使用语言,首先需要安装适当的Tesseract语言包。
:type ocr_languages: str, optional
:param keep_markdown_format: 是否保留markdown格式,默认为False
:type keep_markdown_format: bool
:param keep_newlines: 是否保留换行符格式,默认为False
:type keep_newlines: bool
:raises ValueError: 验证输入时出错
:raises ImportError: 未安装所需的依赖项。"""
[docs] def __init__(
self,
url: str,
api_key: Optional[str] = None,
username: Optional[str] = None,
session: Optional[requests.Session] = None,
oauth2: Optional[dict] = None,
token: Optional[str] = None,
cloud: Optional[bool] = True,
number_of_retries: Optional[int] = 3,
min_retry_seconds: Optional[int] = 2,
max_retry_seconds: Optional[int] = 10,
confluence_kwargs: Optional[dict] = None,
*,
space_key: Optional[str] = None,
page_ids: Optional[List[str]] = None,
label: Optional[str] = None,
cql: Optional[str] = None,
include_restricted_content: bool = False,
include_archived_content: bool = False,
include_attachments: bool = False,
include_comments: bool = False,
content_format: ContentFormat = ContentFormat.STORAGE,
limit: Optional[int] = 50,
max_pages: Optional[int] = 1000,
ocr_languages: Optional[str] = None,
keep_markdown_format: bool = False,
keep_newlines: bool = False,
):
self.space_key = space_key
self.page_ids = page_ids
self.label = label
self.cql = cql
self.include_restricted_content = include_restricted_content
self.include_archived_content = include_archived_content
self.include_attachments = include_attachments
self.include_comments = include_comments
self.content_format = content_format
self.limit = limit
self.max_pages = max_pages
self.ocr_languages = ocr_languages
self.keep_markdown_format = keep_markdown_format
self.keep_newlines = keep_newlines
confluence_kwargs = confluence_kwargs or {}
errors = ConfluenceLoader.validate_init_args(
url=url,
api_key=api_key,
username=username,
session=session,
oauth2=oauth2,
token=token,
)
if errors:
raise ValueError(f"Error(s) while validating input: {errors}")
try:
from atlassian import Confluence
except ImportError:
raise ImportError(
"`atlassian` package not found, please run "
"`pip install atlassian-python-api`"
)
self.base_url = url
self.number_of_retries = number_of_retries
self.min_retry_seconds = min_retry_seconds
self.max_retry_seconds = max_retry_seconds
if session:
self.confluence = Confluence(url=url, session=session, **confluence_kwargs)
elif oauth2:
self.confluence = Confluence(
url=url, oauth2=oauth2, cloud=cloud, **confluence_kwargs
)
elif token:
self.confluence = Confluence(
url=url, token=token, cloud=cloud, **confluence_kwargs
)
else:
self.confluence = Confluence(
url=url,
username=username,
password=api_key,
cloud=cloud,
**confluence_kwargs,
)
[docs] @staticmethod
def validate_init_args(
url: Optional[str] = None,
api_key: Optional[str] = None,
username: Optional[str] = None,
session: Optional[requests.Session] = None,
oauth2: Optional[dict] = None,
token: Optional[str] = None,
) -> Union[List, None]:
"""验证init参数的正确组合"""
errors = []
if url is None:
errors.append("Must provide `base_url`")
if (api_key and not username) or (username and not api_key):
errors.append(
"If one of `api_key` or `username` is provided, "
"the other must be as well."
)
non_null_creds = list(
x is not None for x in ((api_key or username), session, oauth2, token)
)
if sum(non_null_creds) > 1:
all_names = ("(api_key, username)", "session", "oath2", "token")
provided = tuple(n for x, n in zip(non_null_creds, all_names) if x)
errors.append(
f"Cannot provide a value for more than one of: {all_names}. Received "
f"values for: {provided}"
)
if oauth2 and set(oauth2.keys()) != {
"access_token",
"access_token_secret",
"consumer_key",
"key_cert",
}:
errors.append(
"You have either omitted require keys or added extra "
"keys to the oauth2 dictionary. key values should be "
"`['access_token', 'access_token_secret', 'consumer_key', 'key_cert']`"
)
return errors or None
def _resolve_param(self, param_name: str, kwargs: Any) -> Any:
return kwargs[param_name] if param_name in kwargs else getattr(self, param_name)
def _lazy_load(self, **kwargs: Any) -> Iterator[Document]:
if kwargs:
logger.warning(
f"Received runtime arguments {kwargs}. Passing runtime args to `load`"
f" is deprecated. Please pass arguments during initialization instead."
)
space_key = self._resolve_param("space_key", kwargs)
page_ids = self._resolve_param("page_ids", kwargs)
label = self._resolve_param("label", kwargs)
cql = self._resolve_param("cql", kwargs)
include_restricted_content = self._resolve_param(
"include_restricted_content", kwargs
)
include_archived_content = self._resolve_param(
"include_archived_content", kwargs
)
include_attachments = self._resolve_param("include_attachments", kwargs)
include_comments = self._resolve_param("include_comments", kwargs)
content_format = self._resolve_param("content_format", kwargs)
limit = self._resolve_param("limit", kwargs)
max_pages = self._resolve_param("max_pages", kwargs)
ocr_languages = self._resolve_param("ocr_languages", kwargs)
keep_markdown_format = self._resolve_param("keep_markdown_format", kwargs)
keep_newlines = self._resolve_param("keep_newlines", kwargs)
if not space_key and not page_ids and not label and not cql:
raise ValueError(
"Must specify at least one among `space_key`, `page_ids`, "
"`label`, `cql` parameters."
)
if space_key:
pages = self.paginate_request(
self.confluence.get_all_pages_from_space,
space=space_key,
limit=limit,
max_pages=max_pages,
status="any" if include_archived_content else "current",
expand=f"{content_format.value},version",
)
yield from self.process_pages(
pages,
include_restricted_content,
include_attachments,
include_comments,
content_format,
ocr_languages=ocr_languages,
keep_markdown_format=keep_markdown_format,
keep_newlines=keep_newlines,
)
if label:
pages = self.paginate_request(
self.confluence.get_all_pages_by_label,
label=label,
limit=limit,
max_pages=max_pages,
)
ids_by_label = [page["id"] for page in pages]
if page_ids:
page_ids = list(set(page_ids + ids_by_label))
else:
page_ids = list(set(ids_by_label))
if cql:
pages = self.paginate_request(
self._search_content_by_cql,
cql=cql,
limit=limit,
max_pages=max_pages,
include_archived_spaces=include_archived_content,
expand=f"{content_format.value},version",
)
yield from self.process_pages(
pages,
include_restricted_content,
include_attachments,
include_comments,
content_format,
ocr_languages,
keep_markdown_format,
)
if page_ids:
for page_id in page_ids:
get_page = retry(
reraise=True,
stop=stop_after_attempt(
self.number_of_retries # type: ignore[arg-type]
),
wait=wait_exponential(
multiplier=1, # type: ignore[arg-type]
min=self.min_retry_seconds, # type: ignore[arg-type]
max=self.max_retry_seconds, # type: ignore[arg-type]
),
before_sleep=before_sleep_log(logger, logging.WARNING),
)(self.confluence.get_page_by_id)
page = get_page(
page_id=page_id, expand=f"{content_format.value},version"
)
if not include_restricted_content and not self.is_public_page(page):
continue
yield self.process_page(
page,
include_attachments,
include_comments,
content_format,
ocr_languages,
keep_markdown_format,
)
[docs] def load(self, **kwargs: Any) -> List[Document]:
return list(self._lazy_load(**kwargs))
[docs] def lazy_load(self) -> Iterator[Document]:
yield from self._lazy_load()
def _search_content_by_cql(
self, cql: str, include_archived_spaces: Optional[bool] = None, **kwargs: Any
) -> List[dict]:
url = "rest/api/content/search"
params: Dict[str, Any] = {"cql": cql}
params.update(kwargs)
if include_archived_spaces is not None:
params["includeArchivedSpaces"] = include_archived_spaces
response = self.confluence.get(url, params=params)
return response.get("results", [])
[docs] def paginate_request(self, retrieval_method: Callable, **kwargs: Any) -> List:
"""对各种检索页面的方法进行分页。
不幸的是,由于页面大小的限制,有时Confluence API的限制值不匹配。如果`limit`大于100,Confluence似乎会将响应限制为100。此外,由于Atlassian Python包的原因,我们无法从“_links”键中获取“next”值,因为它们只从结果键返回值。因此,在这里,分页从0开始,直到max_pages,每次请求获取`limit`数量的页面。我们必须手动检查返回的页面列表的长度是否有更多文档,而不仅仅是检查响应中是否存在`next`键,就像这个页面所建议的那样:
https://developer.atlassian.com/server/confluence/pagination-in-the-rest-api/
:param retrieval_method: 用于检索文档的函数
:type retrieval_method: callable
:return: 文档列表
:rtype: List
"""
max_pages = kwargs.pop("max_pages")
docs: List[dict] = []
while len(docs) < max_pages:
get_pages = retry(
reraise=True,
stop=stop_after_attempt(
self.number_of_retries # type: ignore[arg-type]
),
wait=wait_exponential(
multiplier=1,
min=self.min_retry_seconds, # type: ignore[arg-type]
max=self.max_retry_seconds, # type: ignore[arg-type]
),
before_sleep=before_sleep_log(logger, logging.WARNING),
)(retrieval_method)
batch = get_pages(**kwargs, start=len(docs))
if not batch:
break
docs.extend(batch)
return docs[:max_pages]
[docs] def is_public_page(self, page: dict) -> bool:
"""检查页面是否可以公开访问。"""
restrictions = self.confluence.get_all_restrictions_for_content(page["id"])
return (
page["status"] == "current"
and not restrictions["read"]["restrictions"]["user"]["results"]
and not restrictions["read"]["restrictions"]["group"]["results"]
)
[docs] def process_pages(
self,
pages: List[dict],
include_restricted_content: bool,
include_attachments: bool,
include_comments: bool,
content_format: ContentFormat,
ocr_languages: Optional[str] = None,
keep_markdown_format: Optional[bool] = False,
keep_newlines: bool = False,
) -> Iterator[Document]:
"""将页面列表处理成文档列表。"""
for page in pages:
if not include_restricted_content and not self.is_public_page(page):
continue
yield self.process_page(
page,
include_attachments,
include_comments,
content_format,
ocr_languages=ocr_languages,
keep_markdown_format=keep_markdown_format,
keep_newlines=keep_newlines,
)
[docs] def process_page(
self,
page: dict,
include_attachments: bool,
include_comments: bool,
content_format: ContentFormat,
ocr_languages: Optional[str] = None,
keep_markdown_format: Optional[bool] = False,
keep_newlines: bool = False,
) -> Document:
if keep_markdown_format:
try:
from markdownify import markdownify
except ImportError:
raise ImportError(
"`markdownify` package not found, please run "
"`pip install markdownify`"
)
if include_comments or not keep_markdown_format:
try:
from bs4 import BeautifulSoup
except ImportError:
raise ImportError(
"`beautifulsoup4` package not found, please run "
"`pip install beautifulsoup4`"
)
if include_attachments:
attachment_texts = self.process_attachment(page["id"], ocr_languages)
else:
attachment_texts = []
content = content_format.get_content(page)
if keep_markdown_format:
# Use markdownify to keep the page Markdown style
text = markdownify(content, heading_style="ATX") + "".join(attachment_texts)
else:
if keep_newlines:
text = BeautifulSoup(
content.replace("</p>", "\n</p>").replace("<br />", "\n"), "lxml"
).get_text(" ") + "".join(attachment_texts)
else:
text = BeautifulSoup(content, "lxml").get_text(
" ", strip=True
) + "".join(attachment_texts)
if include_comments:
comments = self.confluence.get_page_comments(
page["id"], expand="body.view.value", depth="all"
)["results"]
comment_texts = [
BeautifulSoup(comment["body"]["view"]["value"], "lxml").get_text(
" ", strip=True
)
for comment in comments
]
text = text + "".join(comment_texts)
metadata = {
"title": page["title"],
"id": page["id"],
"source": self.base_url.strip("/") + page["_links"]["webui"],
}
if "version" in page and "when" in page["version"]:
metadata["when"] = page["version"]["when"]
return Document(
page_content=text,
metadata=metadata,
)
[docs] def process_attachment(
self,
page_id: str,
ocr_languages: Optional[str] = None,
) -> List[str]:
try:
from PIL import Image # noqa: F401
except ImportError:
raise ImportError(
"`Pillow` package not found, " "please run `pip install Pillow`"
)
# depending on setup you may also need to set the correct path for
# poppler and tesseract
attachments = self.confluence.get_attachments_from_content(page_id)["results"]
texts = []
for attachment in attachments:
media_type = attachment["metadata"]["mediaType"]
absolute_url = self.base_url + attachment["_links"]["download"]
title = attachment["title"]
try:
if media_type == "application/pdf":
text = title + self.process_pdf(absolute_url, ocr_languages)
elif (
media_type == "image/png"
or media_type == "image/jpg"
or media_type == "image/jpeg"
):
text = title + self.process_image(absolute_url, ocr_languages)
elif (
media_type == "application/vnd.openxmlformats-officedocument"
".wordprocessingml.document"
):
text = title + self.process_doc(absolute_url)
elif media_type == "application/vnd.ms-excel":
text = title + self.process_xls(absolute_url)
elif media_type == "image/svg+xml":
text = title + self.process_svg(absolute_url, ocr_languages)
else:
continue
texts.append(text)
except requests.HTTPError as e:
if e.response.status_code == 404:
print(f"Attachment not found at {absolute_url}") # noqa: T201
continue
else:
raise
return texts
[docs] def process_pdf(
self,
link: str,
ocr_languages: Optional[str] = None,
) -> str:
try:
import pytesseract
from pdf2image import convert_from_bytes
except ImportError:
raise ImportError(
"`pytesseract` or `pdf2image` package not found, "
"please run `pip install pytesseract pdf2image`"
)
response = self.confluence.request(path=link, absolute=True)
text = ""
if (
response.status_code != 200
or response.content == b""
or response.content is None
):
return text
try:
images = convert_from_bytes(response.content)
except ValueError:
return text
for i, image in enumerate(images):
image_text = pytesseract.image_to_string(image, lang=ocr_languages)
text += f"Page {i + 1}:\n{image_text}\n\n"
return text
[docs] def process_image(
self,
link: str,
ocr_languages: Optional[str] = None,
) -> str:
try:
import pytesseract
from PIL import Image
except ImportError:
raise ImportError(
"`pytesseract` or `Pillow` package not found, "
"please run `pip install pytesseract Pillow`"
)
response = self.confluence.request(path=link, absolute=True)
text = ""
if (
response.status_code != 200
or response.content == b""
or response.content is None
):
return text
try:
image = Image.open(BytesIO(response.content))
except OSError:
return text
return pytesseract.image_to_string(image, lang=ocr_languages)
[docs] def process_doc(self, link: str) -> str:
try:
import docx2txt
except ImportError:
raise ImportError(
"`docx2txt` package not found, please run `pip install docx2txt`"
)
response = self.confluence.request(path=link, absolute=True)
text = ""
if (
response.status_code != 200
or response.content == b""
or response.content is None
):
return text
file_data = BytesIO(response.content)
return docx2txt.process(file_data)
[docs] def process_xls(self, link: str) -> str:
import io
import os
try:
import xlrd
except ImportError:
raise ImportError("`xlrd` package not found, please run `pip install xlrd`")
try:
import pandas as pd
except ImportError:
raise ImportError(
"`pandas` package not found, please run `pip install pandas`"
)
response = self.confluence.request(path=link, absolute=True)
text = ""
if (
response.status_code != 200
or response.content == b""
or response.content is None
):
return text
filename = os.path.basename(link)
# Getting the whole content of the url after filename,
# Example: ".csv?version=2&modificationDate=1631800010678&cacheVersion=1&api=v2"
file_extension = os.path.splitext(filename)[1]
if file_extension.startswith(
".csv"
): # if the extension found in the url is ".csv"
content_string = response.content.decode("utf-8")
df = pd.read_csv(io.StringIO(content_string))
text += df.to_string(index=False, header=False) + "\n\n"
else:
workbook = xlrd.open_workbook(file_contents=response.content)
for sheet in workbook.sheets():
text += f"{sheet.name}:\n"
for row in range(sheet.nrows):
for col in range(sheet.ncols):
text += f"{sheet.cell_value(row, col)}\t"
text += "\n"
text += "\n"
return text
[docs] def process_svg(
self,
link: str,
ocr_languages: Optional[str] = None,
) -> str:
try:
import pytesseract
from PIL import Image
from reportlab.graphics import renderPM
from svglib.svglib import svg2rlg
except ImportError:
raise ImportError(
"`pytesseract`, `Pillow`, `reportlab` or `svglib` package not found, "
"please run `pip install pytesseract Pillow reportlab svglib`"
)
response = self.confluence.request(path=link, absolute=True)
text = ""
if (
response.status_code != 200
or response.content == b""
or response.content is None
):
return text
drawing = svg2rlg(BytesIO(response.content))
img_data = BytesIO()
renderPM.drawToFile(drawing, img_data, fmt="PNG")
img_data.seek(0)
image = Image.open(img_data)
return pytesseract.image_to_string(image, lang=ocr_languages)