importasyncioimportloggingimportwarningsfromconcurrent.futuresimportFuture,ThreadPoolExecutorfromtypingimport(Any,AsyncIterator,Dict,Iterator,List,Optional,Tuple,Union,cast,)importaiohttpimportrequestsfromlangchain_core.documentsimportDocumentfromlangchain_community.document_loaders.baseimportBaseLoaderfromlangchain_community.utils.user_agentimportget_user_agentlogger=logging.getLogger(__name__)default_header_template={"User-Agent":get_user_agent(),"Accept":"text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*"";q=0.8","Accept-Language":"en-US,en;q=0.5","Referer":"https://www.google.com/","DNT":"1","Connection":"keep-alive","Upgrade-Insecure-Requests":"1",}def_build_metadata(soup:Any,url:str)->dict:"""Build metadata from BeautifulSoup output."""metadata={"source":url}iftitle:=soup.find("title"):metadata["title"]=title.get_text()ifdescription:=soup.find("meta",attrs={"name":"description"}):metadata["description"]=description.get("content","No description found.")ifhtml:=soup.find("html"):metadata["language"]=html.get("lang","No language found.")returnmetadata
[docs]def__init__(self,web_path:Union[str,List[str]],header_template:Optional[dict]=None,verify_ssl:Optional[bool]=True,proxies:Optional[dict]=None,autoset_encoding:bool=True,encoding:Optional[str]=None,default_parser:str="html.parser",requests_per_second:int=2,requests_kwargs:Optional[Dict[str,Any]]=None,raise_for_status:bool=False,ignore_load_errors:bool=False,*,preserve_order:bool=True,trust_env:bool=False,):"""Initialize with a webpage path."""# TODO: Deprecate web_path in favor of web_paths, and remove this# left like this because there are a number of loaders that expect single# urlsifisinstance(web_path,str):self.web_paths=[web_path]elifisinstance(web_path,List):self.web_paths=web_pathheaders=header_templateordefault_header_templateifnotheaders.get("User-Agent"):try:fromfake_useragentimportUserAgentheaders["User-Agent"]=UserAgent().randomexceptImportError:logger.info("fake_useragent not found, using default user agent.""To get a realistic header for requests, ""`pip install fake_useragent`.")self.session=requests.Session()self.session.headers=dict(headers)self.session.verify=verify_sslifproxies:self.session.proxies.update(proxies)self.requests_per_second=requests_per_secondself.default_parser=default_parserself.requests_kwargs=requests_kwargsor{}self.raise_for_status=raise_for_statusself.autoset_encoding=autoset_encodingself.encoding=encodingself.ignore_load_errors=ignore_load_errorsself.preserve_order=preserve_orderself.trust_env=trust_env
def_fetch_valid_connection_docs(self,url:str)->Any:ifself.ignore_load_errors:try:returnself.session.get(url,**self.requests_kwargs)exceptExceptionase:warnings.warn(str(e))returnNonereturnself.session.get(url,**self.requests_kwargs)@staticmethoddef_check_parser(parser:str)->None:"""Check that parser is valid for bs4."""valid_parsers=["html.parser","lxml","xml","lxml-xml","html5lib"]ifparsernotinvalid_parsers:raiseValueError("`parser` must be one of "+", ".join(valid_parsers)+".")asyncdef_fetch(self,url:str,retries:int=3,cooldown:int=2,backoff:float=1.5)->str:asyncwithaiohttp.ClientSession(trust_env=self.trust_env)assession:foriinrange(retries):try:kwargs:Dict=dict(headers=self.session.headers,cookies=self.session.cookies.get_dict(),**self.requests_kwargs,)ifnotself.session.verify:kwargs["ssl"]=Falseasyncwithsession.get(url,**kwargs,)asresponse:try:text=awaitresponse.text()exceptUnicodeDecodeError:logger.error(f"Failed to decode content from {url}")text=""returntextexcept(aiohttp.ClientConnectionError,TimeoutError)ase:ifi==retries-1andself.ignore_load_errors:logger.warning(f"Error fetching {url} after {retries} retries.")return""elifi==retries-1:raiseelse:logger.warning(f"Error fetching {url} with attempt "f"{i+1}/{retries}: {e}. Retrying...")awaitasyncio.sleep(cooldown*backoff**i)raiseValueError("retry count exceeded")asyncdef_fetch_with_rate_limit(self,url:str,semaphore:asyncio.Semaphore)->Tuple[str,str]:asyncwithsemaphore:returnurl,awaitself._fetch(url)asyncdef_lazy_fetch_all(self,urls:List[str],preserve_order:bool)->AsyncIterator[Tuple[str,str]]:semaphore=asyncio.Semaphore(self.requests_per_second)tasks=[asyncio.create_task(self._fetch_with_rate_limit(url,semaphore))forurlinurls]try:fromtqdm.asyncioimporttqdm_asyncioifpreserve_order:fortaskintqdm_asyncio(tasks,desc="Fetching pages",ascii=True,mininterval=1):yieldawaittaskelse:fortaskintqdm_asyncio.as_completed(tasks,desc="Fetching pages",ascii=True,mininterval=1):yieldawaittaskexceptImportError:warnings.warn("For better logging of progress, `pip install tqdm`")ifpreserve_order:forresultinawaitasyncio.gather(*tasks):yieldresultelse:fortaskinasyncio.as_completed(tasks):yieldawaittask
[docs]asyncdeffetch_all(self,urls:List[str])->List[str]:"""Fetch all urls concurrently with rate limiting."""return[docasyncfor_,docinself._lazy_fetch_all(urls,True)]
[docs]deflazy_load(self)->Iterator[Document]:"""Lazy load text from the url(s) in web_path."""results:List[str]try:# Raises RuntimeError if there is no current event loop.asyncio.get_running_loop()# If there is a current event loop, we need to run the async code# in a separate loop, in a separate thread.withThreadPoolExecutor(max_workers=1)asexecutor:future:Future[List[str]]=executor.submit(asyncio.run,# type: ignore[arg-type]self.fetch_all(self.web_paths),# type: ignore[arg-type])results=future.result()exceptRuntimeError:results=asyncio.run(self.fetch_all(self.web_paths))fori,textinenumerate(cast(List[str],results)):yieldself._to_document(self.web_paths[i],text)
[docs]asyncdefalazy_load(self)->AsyncIterator[Document]:"""Lazy load text from the url(s) in web_path."""asyncforurl,textinself._lazy_fetch_all(self.web_paths,self.preserve_order):yieldself._to_document(url,text)