Source code for langchain_community.document_loaders.datadog_logs
from datetime import datetime, timedelta
from typing import List, Optional
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
[docs]class DatadogLogsLoader(BaseLoader):
"""加载`Datadog`日志。
日志被写入`page_content`和`metadata`中。"""
[docs] def __init__(
self,
query: str,
api_key: str,
app_key: str,
from_time: Optional[int] = None,
to_time: Optional[int] = None,
limit: int = 100,
) -> None:
"""初始化Datadog文档加载器。
要求:
- 必须安装datadog_api_client。使用`pip install datadog_api_client`进行安装。
参数:
query: 在Datadog中运行的查询。
api_key: Datadog API密钥。
app_key: Datadog APP密钥。
from_time: 可选。查询的时间范围的开始。
支持日期数学和常规时间戳(毫秒),如'1688732708951'。
默认为20分钟前。
to_time: 可选。查询的时间范围的结束。
支持日期数学和常规时间戳(毫秒),如'1688732708951'。
默认为当前时间。
limit: 返回的日志最大数量。
默认为100。
""" # noqa: E501
try:
from datadog_api_client import Configuration
except ImportError as ex:
raise ImportError(
"Could not import datadog_api_client python package. "
"Please install it with `pip install datadog_api_client`."
) from ex
self.query = query
configuration = Configuration()
configuration.api_key["apiKeyAuth"] = api_key
configuration.api_key["appKeyAuth"] = app_key
self.configuration = configuration
self.from_time = from_time
self.to_time = to_time
self.limit = limit
[docs] def parse_log(self, log: dict) -> Document:
"""
从Datadog日志项创建文档对象。
"""
attributes = log.get("attributes", {})
metadata = {
"id": log.get("id", ""),
"status": attributes.get("status"),
"service": attributes.get("service", ""),
"tags": attributes.get("tags", []),
"timestamp": attributes.get("timestamp", ""),
}
message = attributes.get("message", "")
inside_attributes = attributes.get("attributes", {})
content_dict = {**inside_attributes, "message": message}
content = ", ".join(f"{k}: {v}" for k, v in content_dict.items())
return Document(page_content=content, metadata=metadata)
[docs] def load(self) -> List[Document]:
"""从Datadog获取日志。
返回:
一个Document对象的列表。
- 页面内容
- 元数据
- id
- 服务
- 状态
- 标签
- 时间戳
"""
try:
from datadog_api_client import ApiClient
from datadog_api_client.v2.api.logs_api import LogsApi
from datadog_api_client.v2.model.logs_list_request import LogsListRequest
from datadog_api_client.v2.model.logs_list_request_page import (
LogsListRequestPage,
)
from datadog_api_client.v2.model.logs_query_filter import LogsQueryFilter
from datadog_api_client.v2.model.logs_sort import LogsSort
except ImportError as ex:
raise ImportError(
"Could not import datadog_api_client python package. "
"Please install it with `pip install datadog_api_client`."
) from ex
now = datetime.now()
twenty_minutes_before = now - timedelta(minutes=20)
now_timestamp = int(now.timestamp() * 1000)
twenty_minutes_before_timestamp = int(twenty_minutes_before.timestamp() * 1000)
_from = (
self.from_time
if self.from_time is not None
else twenty_minutes_before_timestamp
)
body = LogsListRequest(
filter=LogsQueryFilter(
query=self.query,
_from=_from,
to=f"{self.to_time if self.to_time is not None else now_timestamp}",
),
sort=LogsSort.TIMESTAMP_ASCENDING,
page=LogsListRequestPage(
limit=self.limit,
),
)
with ApiClient(configuration=self.configuration) as api_client:
api_instance = LogsApi(api_client)
response = api_instance.list_logs(body=body).to_dict()
docs: List[Document] = []
for row in response["data"]:
docs.append(self.parse_log(row))
return docs