Source code for langchain_community.tools.searx_search.tool

"""SearxNG搜索API的工具。"""
from typing import Optional

from langchain_core.callbacks import (
    AsyncCallbackManagerForToolRun,
    CallbackManagerForToolRun,
)
from langchain_core.pydantic_v1 import Extra, Field
from langchain_core.tools import BaseTool

from langchain_community.utilities.searx_search import SearxSearchWrapper


[docs]class SearxSearchRun(BaseTool): """工具,用于查询Searx实例。""" name: str = "searx_search" description: str = ( "A meta search engine." "Useful for when you need to answer questions about current events." "Input should be a search query." ) wrapper: SearxSearchWrapper kwargs: dict = Field(default_factory=dict) def _run( self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None, ) -> str: """使用这个工具。""" return self.wrapper.run(query, **self.kwargs) async def _arun( self, query: str, run_manager: Optional[AsyncCallbackManagerForToolRun] = None, ) -> str: """使用该工具进行异步操作。""" return await self.wrapper.arun(query, **self.kwargs)
[docs]class SearxSearchResults(BaseTool): """工具用于查询一个 Searx 实例,并返回 JSON。""" name: str = "searx_search_results" description: str = ( "A meta search engine." "Useful for when you need to answer questions about current events." "Input should be a search query. Output is a JSON array of the query results" ) wrapper: SearxSearchWrapper num_results: int = 4 kwargs: dict = Field(default_factory=dict) class Config: """Pydantic配置。""" extra = Extra.allow def _run( self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None, ) -> str: """使用这个工具。""" return str(self.wrapper.results(query, self.num_results, **self.kwargs)) async def _arun( self, query: str, run_manager: Optional[AsyncCallbackManagerForToolRun] = None, ) -> str: """使用该工具进行异步操作。""" return ( await self.wrapper.aresults(query, self.num_results, **self.kwargs) ).__str__()