from __future__ import annotations
import os
import uuid
import warnings
from typing import Any, Iterable, List, Optional
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils import guard_import
from langchain_core.vectorstores import VectorStore
[docs]def import_lancedb() -> Any:
"""导入lancedb包。"""
return guard_import("lancedb")
[docs]class LanceDB(VectorStore):
"""`LanceDB`向量存储。
要使用,您应该安装``lancedb`` python包。
您可以使用``pip install lancedb``进行安装。
参数:
connection: 要使用的LanceDB连接。如果未提供,将创建一个新连接。
embedding: 用于向量存储的嵌入。
vector_key: 数据库中向量使用的键。默认为``vector``。
id_key: 数据库中id使用的键。默认为``id``。
text_key: 数据库中文本使用的键。默认为``text``。
table_name: 要使用的表的名称。默认为``vectorstore``。
api_key: 用于LanceDB云数据库的API密钥。
region: 用于LanceDB云数据库的区域。
mode: 用于向表中添加数据的模式。默认为``overwrite``。
示例:
.. code-block:: python
vectorstore = LanceDB(uri='/lancedb', embedding_function)
vectorstore.add_texts(['text1', 'text2'])
result = vectorstore.similarity_search('text1')"""
[docs] def __init__(
self,
connection: Optional[Any] = None,
embedding: Optional[Embeddings] = None,
uri: Optional[str] = "/tmp/lancedb",
vector_key: Optional[str] = "vector",
id_key: Optional[str] = "id",
text_key: Optional[str] = "text",
table_name: Optional[str] = "vectorstore",
api_key: Optional[str] = None,
region: Optional[str] = None,
mode: Optional[str] = "overwrite",
):
"""使用Lance DB向量存储进行初始化"""
lancedb = guard_import("lancedb")
self._embedding = embedding
self._vector_key = vector_key
self._id_key = id_key
self._text_key = text_key
self._table_name = table_name
self.api_key = api_key or os.getenv("LANCE_API_KEY") if api_key != "" else None
self.region = region
self.mode = mode
if isinstance(uri, str) and self.api_key is None:
if uri.startswith("db://"):
raise ValueError("API key is required for LanceDB cloud.")
if self._embedding is None:
raise ValueError("embedding object should be provided")
if isinstance(connection, lancedb.db.LanceDBConnection):
self._connection = connection
elif isinstance(connection, (str, lancedb.db.LanceTable)):
raise ValueError(
"`connection` has to be a lancedb.db.LanceDBConnection object.\
`lancedb.db.LanceTable` is deprecated."
)
else:
if self.api_key is None:
self._connection = lancedb.connect(uri)
else:
if isinstance(uri, str):
if uri.startswith("db://"):
self._connection = lancedb.connect(
uri, api_key=self.api_key, region=self.region
)
else:
self._connection = lancedb.connect(uri)
warnings.warn(
"api key provided with local uri.\
The data will be stored locally"
)
@property
def embeddings(self) -> Optional[Embeddings]:
return self._embedding
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""将文本转换为嵌入并将其添加到数据库
参数:
texts:要添加到向量存储中的字符串的可迭代对象。
metadatas:与文本相关联的元数据的可选列表。
ids:要与文本关联的可选id列表。
返回:
已添加文本的id列表。
"""
# Embed texts and create documents
docs = []
ids = ids or [str(uuid.uuid4()) for _ in texts]
embeddings = self._embedding.embed_documents(list(texts)) # type: ignore
for idx, text in enumerate(texts):
embedding = embeddings[idx]
metadata = metadatas[idx] if metadatas else {"id": ids[idx]}
docs.append(
{
self._vector_key: embedding,
self._id_key: ids[idx],
self._text_key: text,
"metadata": metadata,
}
)
if self._table_name in self._connection.table_names():
tbl = self._connection.open_table(self._table_name)
if self.api_key is None:
tbl.add(docs, mode=self.mode)
else:
tbl.add(docs)
else:
self._connection.create_table(self._table_name, data=docs)
return ids
[docs] def get_table(
self, name: Optional[str] = None, set_default: Optional[bool] = False
) -> Any:
"""从数据库中获取一个表对象。
参数:
name (str, optional): 要获取的表的名称。默认为None,获取当前表对象。
set_default (bool, optional): 将获取的表设置为默认表。默认为False。
返回:
Any: 获取的表对象。
引发:
ValueError: 如果在数据库中找不到指定的表。
"""
if name is not None:
try:
if set_default:
self._table_name = name
return self._connection.open_table(name)
except Exception:
raise ValueError(f"Table {name} not found in the database")
else:
return self._connection.open_table(self._table_name)
[docs] def create_index(
self,
col_name: Optional[str] = None,
vector_col: Optional[str] = None,
num_partitions: Optional[int] = 256,
num_sub_vectors: Optional[int] = 96,
index_cache_size: Optional[int] = None,
metric: Optional[str] = "L2",
) -> None:
"""在表上创建一个标量(用于非向量列)或向量索引。
在创建索引之前,请确保您的向量列有足够的数据。
参数:
vector_col:如果要在向量列上创建索引,请提供。
col_name:如果要在非向量列上创建索引,请提供。
metric:提供用于向量索引的度量标准。默认为'L2'
度量标准选择:'L2','dot','cosine'
返回:
无
"""
tbl = self.get_table()
if vector_col:
tbl.create_index(
metric=metric,
vector_column_name=vector_col,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
index_cache_size=index_cache_size,
)
elif col_name:
tbl.create_scalar_index(col_name)
else:
raise ValueError("Provide either vector_col or col_name")
[docs] def similarity_search(
self, query: str, k: int = 4, name: Optional[str] = None, **kwargs: Any
) -> List[Document]:
"""返回与查询最相似的文档
参数:
query: 用于查询向量存储的字符串。
k: 要返回的文档数量。
返回:
与查询最相似的文档列表。
"""
embedding = self._embedding.embed_query(query) # type: ignore
tbl = self.get_table(name)
docs = (
tbl.search(embedding, vector_column_name=self._vector_key)
.limit(k)
.to_arrow()
)
columns = docs.schema.names
return [
Document(
page_content=docs[self._text_key][idx].as_py(),
metadata={
col: docs[col][idx].as_py()
for col in columns
if col != self._text_key
},
)
for idx in range(len(docs))
]
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
connection: Any = None,
vector_key: Optional[str] = "vector",
id_key: Optional[str] = "id",
text_key: Optional[str] = "text",
table_name: Optional[str] = "vectorstore",
**kwargs: Any,
) -> LanceDB:
instance = LanceDB(
connection=connection,
embedding=embedding,
vector_key=vector_key,
id_key=id_key,
text_key=text_key,
table_name=table_name,
)
instance.add_texts(texts, metadatas=metadatas, **kwargs)
return instance
[docs] def delete(
self,
ids: Optional[List[str]] = None,
delete_all: Optional[bool] = None,
filter: Optional[str] = None,
drop_columns: Optional[List[str]] = None,
name: Optional[str] = None,
**kwargs: Any,
) -> None:
"""允许通过过滤、按id删除行或从表中删除列。
参数:
filter: 提供一个字符串SQL表达式 - "{col} {operation} {value}"。
ids: 提供要从表中删除的id列表。
drop_columns: 提供要从表中删除的列列表。
delete_all: 如果为True,则删除表中的所有行。
"""
tbl = self.get_table(name)
if filter:
tbl.delete(filter)
elif ids:
tbl.delete("id in ('{}')".format(",".join(ids)))
elif drop_columns:
if self.api_key is not None:
raise NotImplementedError(
"Column operations currently not supported in LanceDB Cloud."
)
else:
tbl.drop_columns(drop_columns)
elif delete_all:
tbl.delete("true")
else:
raise ValueError("Provide either filter, ids, drop_columns or delete_all")