langchain_community.document_loaders.etherscan ηζΊδ»£η
import os
import re
from typing import Iterator, List
import requests
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
[docs]
class EtherscanLoader(BaseLoader):
"""Load transactions from `Ethereum` mainnet.
The Loader use Etherscan API to interact with Ethereum mainnet.
ETHERSCAN_API_KEY environment variable must be set use this loader.
"""
[docs]
def __init__(
self,
account_address: str,
api_key: str = "docs-demo",
filter: str = "normal_transaction",
page: int = 1,
offset: int = 10,
start_block: int = 0,
end_block: int = 99999999,
sort: str = "desc",
):
self.account_address = account_address
self.api_key = os.environ.get("ETHERSCAN_API_KEY") or api_key
self.filter = filter
self.page = page
self.offset = offset
self.start_block = start_block
self.end_block = end_block
self.sort = sort
if not self.api_key:
raise ValueError("Etherscan API key not provided")
if not re.match(r"^0x[a-fA-F0-9]{40}$", self.account_address):
raise ValueError(f"Invalid contract address {self.account_address}")
if filter not in [
"normal_transaction",
"internal_transaction",
"erc20_transaction",
"eth_balance",
"erc721_transaction",
"erc1155_transaction",
]:
raise ValueError(f"Invalid filter {filter}")
[docs]
def lazy_load(self) -> Iterator[Document]:
"""Lazy load Documents from table."""
result = []
if self.filter == "normal_transaction":
result = self.getNormTx()
elif self.filter == "internal_transaction":
result = self.getInternalTx()
elif self.filter == "erc20_transaction":
result = self.getERC20Tx()
elif self.filter == "eth_balance":
result = self.getEthBalance()
elif self.filter == "erc721_transaction":
result = self.getERC721Tx()
elif self.filter == "erc1155_transaction":
result = self.getERC1155Tx()
else:
raise ValueError(f"Invalid filter {filter}")
for doc in result:
yield doc
[docs]
def getNormTx(self) -> List[Document]:
url = (
f"https://api.etherscan.io/api?module=account&action=txlist&address={self.account_address}"
f"&startblock={self.start_block}&endblock={self.end_block}&page={self.page}"
f"&offset={self.offset}&sort={self.sort}&apikey={self.api_key}"
)
try:
response = requests.get(url)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print("Error occurred while making the request:", e) # noqa: T201
items = response.json()["result"]
result = []
if len(items) == 0:
return [Document(page_content="")]
for item in items:
content = str(item)
metadata = {"from": item["from"], "tx_hash": item["hash"], "to": item["to"]}
result.append(Document(page_content=content, metadata=metadata))
print(len(result)) # noqa: T201
return result
[docs]
def getEthBalance(self) -> List[Document]:
url = (
f"https://api.etherscan.io/api?module=account&action=balance"
f"&address={self.account_address}&tag=latest&apikey={self.api_key}"
)
try:
response = requests.get(url)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print("Error occurred while making the request:", e) # noqa: T201
return [Document(page_content=response.json()["result"])]
[docs]
def getInternalTx(self) -> List[Document]:
url = (
f"https://api.etherscan.io/api?module=account&action=txlistinternal"
f"&address={self.account_address}&startblock={self.start_block}"
f"&endblock={self.end_block}&page={self.page}&offset={self.offset}"
f"&sort={self.sort}&apikey={self.api_key}"
)
try:
response = requests.get(url)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print("Error occurred while making the request:", e) # noqa: T201
items = response.json()["result"]
result = []
if len(items) == 0:
return [Document(page_content="")]
for item in items:
content = str(item)
metadata = {"from": item["from"], "tx_hash": item["hash"], "to": item["to"]}
result.append(Document(page_content=content, metadata=metadata))
return result
[docs]
def getERC20Tx(self) -> List[Document]:
url = (
f"https://api.etherscan.io/api?module=account&action=tokentx"
f"&address={self.account_address}&startblock={self.start_block}"
f"&endblock={self.end_block}&page={self.page}&offset={self.offset}"
f"&sort={self.sort}&apikey={self.api_key}"
)
try:
response = requests.get(url)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print("Error occurred while making the request:", e) # noqa: T201
items = response.json()["result"]
result = []
if len(items) == 0:
return [Document(page_content="")]
for item in items:
content = str(item)
metadata = {"from": item["from"], "tx_hash": item["hash"], "to": item["to"]}
result.append(Document(page_content=content, metadata=metadata))
return result
[docs]
def getERC721Tx(self) -> List[Document]:
url = (
f"https://api.etherscan.io/api?module=account&action=tokennfttx"
f"&address={self.account_address}&startblock={self.start_block}"
f"&endblock={self.end_block}&page={self.page}&offset={self.offset}"
f"&sort={self.sort}&apikey={self.api_key}"
)
try:
response = requests.get(url)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print("Error occurred while making the request:", e) # noqa: T201
items = response.json()["result"]
result = []
if len(items) == 0:
return [Document(page_content="")]
for item in items:
content = str(item)
metadata = {"from": item["from"], "tx_hash": item["hash"], "to": item["to"]}
result.append(Document(page_content=content, metadata=metadata))
return result
[docs]
def getERC1155Tx(self) -> List[Document]:
url = (
f"https://api.etherscan.io/api?module=account&action=token1155tx"
f"&address={self.account_address}&startblock={self.start_block}"
f"&endblock={self.end_block}&page={self.page}&offset={self.offset}"
f"&sort={self.sort}&apikey={self.api_key}"
)
try:
response = requests.get(url)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print("Error occurred while making the request:", e) # noqa: T201
items = response.json()["result"]
result = []
if len(items) == 0:
return [Document(page_content="")]
for item in items:
content = str(item)
metadata = {"from": item["from"], "tx_hash": item["hash"], "to": item["to"]}
result.append(Document(page_content=content, metadata=metadata))
return result