"""Web base loader class."""
import asyncio
import logging
import warnings
from typing import Any, AsyncIterator, Dict, Iterator, List, Optional, Sequence, Union
import aiohttp
import requests
from langchain_core._api import deprecated
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
from langchain_community.utils.user_agent import get_user_agent
logger = logging.getLogger(__name__)
default_header_template = {
"User-Agent": get_user_agent(),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*"
";q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Referer": "https://www.google.com/",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
def _build_metadata(soup: Any, url: str) -> dict:
"""Build metadata from BeautifulSoup output."""
metadata = {"source": url}
if title := soup.find("title"):
metadata["title"] = title.get_text()
if description := soup.find("meta", attrs={"name": "description"}):
metadata["description"] = description.get("content", "No description found.")
if html := soup.find("html"):
metadata["language"] = html.get("lang", "No language found.")
return metadata
[docs]
class WebBaseLoader(BaseLoader):
"""
WebBaseLoader document loader integration
Setup:
Install ``langchain_community``.
.. code-block:: bash
pip install -U langchain_community
Instantiate:
.. code-block:: python
from langchain_community.document_loaders import WebBaseLoader
loader = WebBaseLoader(
web_path = "https://www.espn.com/"
# header_template = None,
# verify_ssl = True,
# proxies = None,
# continue_on_failure = False,
# autoset_encoding = True,
# encoding = None,
# web_paths = (),
# requests_per_second = 2,
# default_parser = "html.parser",
# requests_kwargs = None,
# raise_for_status = False,
# bs_get_text_kwargs = None,
# bs_kwargs = None,
# session = None,
# show_progress = True,
# trust_env = False,
)
Lazy load:
.. code-block:: python
docs = []
for doc in loader.lazy_load():
docs.append(doc)
print(docs[0].page_content[:100])
print(docs[0].metadata)
.. code-block:: python
ESPN - Serving Sports Fans. Anytime. Anywhere.
{'source': 'https://www.espn.com/', 'title': 'ESPN - Serving Sports Fans. Anytime. Anywhere.', 'description': 'Visit ESPN for live scores, highlights and sports news. Stream exclusive games on ESPN+ and play fantasy sports.', 'language': 'en'}
Async load:
.. code-block:: python
docs = []
async for doc in loader.alazy_load():
docs.append(doc)
print(docs[0].page_content[:100])
print(docs[0].metadata)
.. code-block:: python
ESPN - Serving Sports Fans. Anytime. Anywhere.
{'source': 'https://www.espn.com/', 'title': 'ESPN - Serving Sports Fans. Anytime. Anywhere.', 'description': 'Visit ESPN for live scores, highlights and sports news. Stream exclusive games on ESPN+ and play fantasy sports.', 'language': 'en'}
.. versionchanged:: 0.3.14
Deprecated ``aload`` (which was not async) and implemented a native async
``alazy_load``. Expand below for more details.
.. dropdown:: How to update ``aload``
Instead of using ``aload``, you can use ``load`` for synchronous loading or
``alazy_load`` for asynchronous lazy loading.
Example using ``load`` (synchronous):
.. code-block:: python
docs: List[Document] = loader.load()
Example using ``alazy_load`` (asynchronous):
.. code-block:: python
docs: List[Document] = []
async for doc in loader.alazy_load():
docs.append(doc)
This is in preparation for accommodating an asynchronous ``aload`` in the
future:
.. code-block:: python
docs: List[Document] = await loader.aload()
""" # noqa: E501
[docs]
def __init__(
self,
web_path: Union[str, Sequence[str]] = "",
header_template: Optional[dict] = None,
verify_ssl: bool = True,
proxies: Optional[dict] = None,
continue_on_failure: bool = False,
autoset_encoding: bool = True,
encoding: Optional[str] = None,
web_paths: Sequence[str] = (),
requests_per_second: int = 2,
default_parser: str = "html.parser",
requests_kwargs: Optional[Dict[str, Any]] = None,
raise_for_status: bool = False,
bs_get_text_kwargs: Optional[Dict[str, Any]] = None,
bs_kwargs: Optional[Dict[str, Any]] = None,
session: Any = None,
*,
show_progress: bool = True,
trust_env: bool = False,
) -> None:
"""Initialize loader.
Args:
web_paths: Web paths to load from.
requests_per_second: Max number of concurrent requests to make.
default_parser: Default parser to use for BeautifulSoup.
requests_kwargs: kwargs for requests
raise_for_status: Raise an exception if http status code denotes an error.
bs_get_text_kwargs: kwargs for beatifulsoup4 get_text
bs_kwargs: kwargs for beatifulsoup4 web page parsing
show_progress: Show progress bar when loading pages.
trust_env: set to True if using proxy to make web requests, for example
using http(s)_proxy environment variables. Defaults to False.
"""
# web_path kept for backwards-compatibility.
if web_path and web_paths:
raise ValueError(
"Received web_path and web_paths. Only one can be specified. "
"web_path is deprecated, web_paths should be used."
)
if web_paths:
self.web_paths = list(web_paths)
elif isinstance(web_path, str):
self.web_paths = [web_path]
elif isinstance(web_path, Sequence):
self.web_paths = list(web_path)
else:
raise TypeError(
f"web_path must be str or Sequence[str] got ({type(web_path)}) or"
f" web_paths must be Sequence[str] got ({type(web_paths)})"
)
self.requests_per_second = requests_per_second
self.default_parser = default_parser
self.requests_kwargs = requests_kwargs or {}
self.raise_for_status = raise_for_status
self.show_progress = show_progress
self.bs_get_text_kwargs = bs_get_text_kwargs or {}
self.bs_kwargs = bs_kwargs or {}
if session:
self.session = session
else:
session = requests.Session()
header_template = header_template or default_header_template.copy()
if not header_template.get("User-Agent"):
try:
from fake_useragent import UserAgent
header_template["User-Agent"] = UserAgent().random
except ImportError:
logger.info(
"fake_useragent not found, using default user agent."
"To get a realistic header for requests, "
"`pip install fake_useragent`."
)
session.headers = dict(header_template)
session.verify = verify_ssl
if proxies:
session.proxies.update(proxies)
self.session = session
self.continue_on_failure = continue_on_failure
self.autoset_encoding = autoset_encoding
self.encoding = encoding
self.trust_env = trust_env
@property
def web_path(self) -> str:
if len(self.web_paths) > 1:
raise ValueError("Multiple webpaths found.")
return self.web_paths[0]
async def _fetch(
self, url: str, retries: int = 3, cooldown: int = 2, backoff: float = 1.5
) -> str:
async with aiohttp.ClientSession(trust_env=self.trust_env) as session:
for i in range(retries):
try:
kwargs: Dict = dict(
headers=self.session.headers,
cookies=self.session.cookies.get_dict(),
)
if not self.session.verify:
kwargs["ssl"] = False
async with session.get(
url, **(self.requests_kwargs | kwargs)
) as response:
if self.raise_for_status:
response.raise_for_status()
return await response.text()
except aiohttp.ClientConnectionError as e:
if i == retries - 1:
raise
else:
logger.warning(
f"Error fetching {url} with attempt "
f"{i + 1}/{retries}: {e}. Retrying..."
)
await asyncio.sleep(cooldown * backoff**i)
raise ValueError("retry count exceeded")
async def _fetch_with_rate_limit(
self, url: str, semaphore: asyncio.Semaphore
) -> str:
async with semaphore:
try:
return await self._fetch(url)
except Exception as e:
if self.continue_on_failure:
logger.warning(
f"Error fetching {url}, skipping due to"
f" continue_on_failure=True"
)
return ""
logger.exception(
f"Error fetching {url} and aborting, use continue_on_failure=True "
"to continue loading urls after encountering an error."
)
raise e
[docs]
async def fetch_all(self, urls: List[str]) -> Any:
"""Fetch all urls concurrently with rate limiting."""
semaphore = asyncio.Semaphore(self.requests_per_second)
tasks = []
for url in urls:
task = asyncio.ensure_future(self._fetch_with_rate_limit(url, semaphore))
tasks.append(task)
try:
if self.show_progress:
from tqdm.asyncio import tqdm_asyncio
return await tqdm_asyncio.gather(
*tasks, desc="Fetching pages", ascii=True, mininterval=1
)
else:
return await asyncio.gather(*tasks)
except ImportError:
warnings.warn("For better logging of progress, `pip install tqdm`")
return await asyncio.gather(*tasks)
@staticmethod
def _check_parser(parser: str) -> None:
"""Check that parser is valid for bs4."""
valid_parsers = ["html.parser", "lxml", "xml", "lxml-xml", "html5lib"]
if parser not in valid_parsers:
raise ValueError(
"`parser` must be one of " + ", ".join(valid_parsers) + "."
)
def _unpack_fetch_results(
self, results: Any, urls: List[str], parser: Union[str, None] = None
) -> List[Any]:
"""Unpack fetch results into BeautifulSoup objects."""
from bs4 import BeautifulSoup
final_results = []
for i, result in enumerate(results):
url = urls[i]
if parser is None:
if url.endswith(".xml"):
parser = "xml"
else:
parser = self.default_parser
self._check_parser(parser)
final_results.append(BeautifulSoup(result, parser, **self.bs_kwargs))
return final_results
[docs]
def scrape_all(self, urls: List[str], parser: Union[str, None] = None) -> List[Any]:
"""Fetch all urls, then return soups for all results."""
results = asyncio.run(self.fetch_all(urls))
return self._unpack_fetch_results(results, urls, parser=parser)
[docs]
async def ascrape_all(
self, urls: List[str], parser: Union[str, None] = None
) -> List[Any]:
"""Async fetch all urls, then return soups for all results."""
results = await self.fetch_all(urls)
return self._unpack_fetch_results(results, urls, parser=parser)
def _scrape(
self,
url: str,
parser: Union[str, None] = None,
bs_kwargs: Optional[dict] = None,
) -> Any:
from bs4 import BeautifulSoup
if parser is None:
if url.endswith(".xml"):
parser = "xml"
else:
parser = self.default_parser
self._check_parser(parser)
html_doc = self.session.get(url, **self.requests_kwargs)
if self.raise_for_status:
html_doc.raise_for_status()
if self.encoding is not None:
html_doc.encoding = self.encoding
elif self.autoset_encoding:
html_doc.encoding = html_doc.apparent_encoding
return BeautifulSoup(html_doc.text, parser, **(bs_kwargs or {}))
[docs]
def scrape(self, parser: Union[str, None] = None) -> Any:
"""Scrape data from webpage and return it in BeautifulSoup format."""
if parser is None:
parser = self.default_parser
return self._scrape(self.web_path, parser=parser, bs_kwargs=self.bs_kwargs)
[docs]
def lazy_load(self) -> Iterator[Document]:
"""Lazy load text from the url(s) in web_path."""
for path in self.web_paths:
soup = self._scrape(path, bs_kwargs=self.bs_kwargs)
text = soup.get_text(**self.bs_get_text_kwargs)
metadata = _build_metadata(soup, path)
yield Document(page_content=text, metadata=metadata)
[docs]
async def alazy_load(self) -> AsyncIterator[Document]:
"""Async lazy load text from the url(s) in web_path."""
results = await self.ascrape_all(self.web_paths)
for path, soup in zip(self.web_paths, results):
text = soup.get_text(**self.bs_get_text_kwargs)
metadata = _build_metadata(soup, path)
yield Document(page_content=text, metadata=metadata)
[docs]
@deprecated(
since="0.3.14",
removal="1.0",
message=(
"See API reference for updated usage: "
"https://python.langchain.com/api_reference/community/document_loaders/langchain_community.document_loaders.web_base.WebBaseLoader.html" # noqa: E501
),
)
def aload(self) -> List[Document]: # type: ignore
"""Load text from the urls in web_path async into Documents."""
results = self.scrape_all(self.web_paths)
docs = []
for path, soup in zip(self.web_paths, results):
text = soup.get_text(**self.bs_get_text_kwargs)
metadata = _build_metadata(soup, path)
docs.append(Document(page_content=text, metadata=metadata))
return docs