import json
import os
import re
import time
from typing import Iterator, List, Literal, Optional
import requests
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
[docs]class MintbaseDocumentLoader(BaseLoader):
"""从区块链智能合约中加载元素。
支持的区块链有:Near主网,Near测试网。
如果未指定BlockchainType,则默认为Near主网。
加载器使用Mintbase API与区块链进行交互。
必须设置MB_API_KEY环境变量才能使用此加载器。
API每次返回100个NFT,并可以使用startToken参数进行分页。
如果get_all_tokens设置为True,则加载器将获取合约上的所有代币。
请注意,对于代币数量较多的合约,这可能需要很长时间(例如,10k代币需要100次请求)。
出于这个原因,默认值为false。
可以设置max_execution_time(秒)来限制加载器的执行时间。
该加载器的未来版本可以:
- 支持额外的Mintbase API(例如,getTokens等)。
示例:
.. code-block:: python
contractAddress = "nft.yearofchef.near" # 厨师之年合约地址
blockchainLoader = MintbaseDocumentLoader(
contract_address=contractAddress, blockchain_type="mainnet",api_key="omni-site"
)""" # noqa: E501
[docs] def __init__(
self,
contract_address: str,
*,
blockchain_type: Literal["mainnet", "testnet"],
api_key: str = "",
table: str = "",
select: str = "",
fields: Optional[List[str]] = None,
get_all_tokens: bool = False,
max_execution_time: Optional[int] = None,
):
"""参数:
contract_address:智能合约的地址。
blockchainType:区块链类型。
api_key:Mintbase API密钥。
table:要查询的表的名称。
select:查询条件。
fields:查询后要显示的信息。
get_all_tokens:是否获取合约上的所有代币。
max_execution_time:最大执行时间(秒)。
"""
self.contract_address = contract_address
self.blockchainType = blockchain_type
self.api_key = os.environ.get("MB_API_KEY") or api_key
self.table = "mb_views_nft_tokens" or table
self.select = 'where: {nft_contract_id: {_eq: "contract_address"}}' or select
self.fields = fields or [
"base_uri",
"burned_receipt_id",
"burned_timestamp",
"copies",
"description",
"expires_at",
"extra",
"issued_at",
"last_transfer_receipt_id",
"last_transfer_timestamp",
"media",
"media_hash",
"metadata_content_flag",
"metadata_id",
"mint_memo",
"minted_receipt_id",
"minted_timestamp",
"minter",
"nft_contract_content_flag",
"nft_contract_created_at",
"nft_contract_icon",
"nft_contract_id",
"nft_contract_is_mintbase",
"nft_contract_name",
"nft_contract_owner_id",
"nft_contract_reference",
"nft_contract_spec",
"nft_contract_symbol",
"owner",
"reference",
"reference_blob",
"reference_hash",
"royalties",
"royalties_percent",
"splits",
"starts_at",
"title",
"token_id",
"updated_at",
]
self.get_all_tokens = get_all_tokens
self.max_execution_time = max_execution_time
if not self.api_key:
raise ValueError("Mintbase API key not provided.")
if not re.match(
r"^(([a-z\d]+[\-_])*[a-z\d]+\.)*([a-z\d]+[\-_])*[a-z\d]+$",
self.contract_address,
):
raise ValueError(f"Invalid contract address {self.contract_address}")
[docs] def load(self) -> List[Document]:
result = []
start_time = time.time()
while True:
# Define the GraphQL query as a multi-line string
operations_doc = """
query MyQuery {
table(select) {
fields
}
}
"""
# Replace the placeholder with the actual contract address
operations_doc = operations_doc.replace("select", self.select)
operations_doc = operations_doc.replace(
"contract_address", self.contract_address
)
operations_doc = operations_doc.replace("table", self.table)
operations_doc = operations_doc.replace("fields", "\n".join(self.fields))
# Define the headers
headers = {"mb-api-key": self.api_key, "Content-Type": "application/json"}
# Define the POST data
data = {
"query": operations_doc,
"variables": {},
"operationName": "MyQuery",
}
url = f"https://graph.mintbase.xyz/{self.blockchainType}"
response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code != 200:
raise ValueError(
f"Request failed with status code {response.status_code}"
)
items = response.json()["data"]["mb_views_nft_tokens"]
if not items:
break
for item in items:
content = str(item)
token_id = item["token_id"]
metadata = {
"source": self.contract_address,
"blockchain": self.blockchainType,
"tokenId": token_id,
}
result.append(Document(page_content=content, metadata=metadata))
# exit after the first API call if get_all_tokens is False
if not self.get_all_tokens:
break
if (
self.max_execution_time is not None
and (time.time() - start_time) > self.max_execution_time
):
raise RuntimeError("Execution time exceeded the allowed time limit.")
if not result:
raise ValueError(
f"No NFTs found for contract address {self.contract_address}"
)
return result
[docs] def lazy_load(self) -> Iterator[Document]:
start_time = time.time()
while True:
# Define the GraphQL query as a multi-line string
operations_doc = """
query MyQuery {
table(select) {
fields
}
}
"""
# Replace the placeholder with the actual contract address
operations_doc = operations_doc.replace("select", self.select)
operations_doc = operations_doc.replace(
"contract_address", self.contract_address
)
operations_doc = operations_doc.replace("table", self.table)
operations_doc = operations_doc.replace("fields", "\n".join(self.fields))
# Define the headers
headers = {"mb-api-key": self.api_key, "Content-Type": "application/json"}
# Define the POST data
data = {
"query": operations_doc,
"variables": {},
"operationName": "MyQuery",
}
url = f"https://graph.mintbase.xyz/{self.blockchainType}"
response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code != 200:
raise ValueError(
f"Request failed with status code {response.status_code}"
)
items = response.json()["data"]["mb_views_nft_tokens"]
if not items:
break
for item in items:
content = str(item)
tokenId = item["token_id"]
metadata = {
"source": self.contract_address,
"blockchain": self.blockchainType,
"tokenId": tokenId,
}
yield Document(page_content=content, metadata=metadata)
# exit after the first API call if get_all_tokens is False
if not self.get_all_tokens:
break
if (
self.max_execution_time is not None
and (time.time() - start_time) > self.max_execution_time
):
raise RuntimeError("Execution time exceeded the allowed time limit.")