Source code for langchain_community.utilities.stackexchange

import html
from typing import Any, Dict, Literal

from langchain_core.pydantic_v1 import BaseModel, Field, root_validator


[docs]class StackExchangeAPIWrapper(BaseModel): """Stack Exchange API的封装器。""" client: Any #: :meta private: max_results: int = 3 """输出结果""" query_type: Literal["all", "title", "body"] = "all" """匹配StackOverflow项目的哪一部分。可以是'all'、'title'或'body'中的一个。默认为'all'。""" fetch_params: Dict[str, Any] = Field(default_factory=dict) """传递给StackApi.fetch的额外参数。""" result_separator: str = "\n\n" """问题和答案对之间的分隔符。""" @root_validator() def validate_environment(cls, values: Dict) -> Dict: """验证所需的Python包是否存在。""" try: from stackapi import StackAPI values["client"] = StackAPI("stackoverflow") except ImportError: raise ImportError( "The 'stackapi' Python package is not installed. " "Please install it with `pip install stackapi`." ) return values
[docs] def run(self, query: str) -> str: """通过StackExchange API运行查询并解析结果。""" query_key = "q" if self.query_type == "all" else self.query_type output = self.client.fetch( "search/excerpts", **{query_key: query}, **self.fetch_params ) if len(output["items"]) < 1: return f"No relevant results found for '{query}' on Stack Overflow." questions = [ item for item in output["items"] if item["item_type"] == "question" ][: self.max_results] answers = [item for item in output["items"] if item["item_type"] == "answer"] results = [] for question in questions: res_text = f"Question: {question['title']}\n{question['excerpt']}" relevant_answers = [ answer for answer in answers if answer["question_id"] == question["question_id"] ] accepted_answers = [ answer for answer in relevant_answers if answer["is_accepted"] ] if relevant_answers: top_answer = ( accepted_answers[0] if accepted_answers else relevant_answers[0] ) excerpt = html.unescape(top_answer["excerpt"]) res_text += f"\nAnswer: {excerpt}" results.append(res_text) return self.result_separator.join(results)