跳至内容

Cassandra

CassandraVectorStore #

基类: BasePydanticVectorStore

Cassandra 向量存储。

一个支持向量相似性搜索的Cassandra表抽象。文档及其嵌入向量存储在Cassandra表中,并使用支持向量搜索的索引进行查询。该表不需要预先存在:如有必要,它会在后台自动创建。

所有Cassandra操作均通过CassIO库完成。

注意:在最新版本中,只有tableembedding_dimension可以 通过位置参数传递。如有需要,请修改您的代码。 这是为了适应更简洁的使用方式,即通过cassio.init(...)调用 全局设置数据库连接:这样在创建向量存储时就不再需要指定 数据库详细信息,除非有特殊需求。

参数:

名称 类型 描述 默认值
table str

要使用的表名。如果不存在,将会被创建。

required
embedding_dimension int

当前使用的嵌入向量的长度。

required
session (optional, Session)

要使用的Cassandra会话。 可以省略,或等效设置为None,以使用之前通过cassio.init()全局设置的数据库连接。

None
keyspace str

要操作的Cassandra键空间名称 可以省略,或等效设置为None,以使用之前通过cassio.init()全局设置的数据库连接。

None
ttl_seconds (optional, int)

插入条目的过期时间。 默认为永不过期(None)。

None
insertion_batch_size (optional, int)

批量插入时并发插入的向量数量。默认为20。

DEFAULT_INSERTION_BATCH_SIZE

示例:

pip install llama-index-vector-stores-cassandra

from llama_index.vector_stores.cassandra import CassandraVectorStore

vector_store = CassandraVectorStore(
    table="cass_v_table", embedding_dimension=1536
)
Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-cassandra/llama_index/vector_stores/cassandra/base.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
class CassandraVectorStore(BasePydanticVectorStore):
    """
    Cassandra Vector Store.

    An abstraction of a Cassandra table with
    vector-similarity-search. Documents, and their embeddings, are stored
    in a Cassandra table and a vector-capable index is used for searches.
    The table does not need to exist beforehand: if necessary it will
    be created behind the scenes.

    All Cassandra operations are done through the CassIO library.

    Note: in recent versions, only `table` and `embedding_dimension` can be
    passed positionally. Please revise your code if needed.
    This is to accommodate for a leaner usage, whereby the DB connection
    is set globally through a `cassio.init(...)` call: then, the DB details
    are not to be specified anymore when creating a vector store, unless
    desired.

    Args:
        table (str): table name to use. If not existing, it will be created.
        embedding_dimension (int): length of the embedding vectors in use.
        session (optional, cassandra.cluster.Session): the Cassandra session
            to use.
            Can be omitted, or equivalently set to None, to use the
            DB connection set globally through cassio.init() beforehand.
        keyspace (optional. str): name of the Cassandra keyspace to work in
            Can be omitted, or equivalently set to None, to use the
            DB connection set globally through cassio.init() beforehand.
        ttl_seconds (optional, int): expiration time for inserted entries.
            Default is no expiration (None).
        insertion_batch_size (optional, int): how many vectors are inserted
            concurrently, for use by bulk inserts. Defaults to 20.

    Examples:
        `pip install llama-index-vector-stores-cassandra`

        ```python
        from llama_index.vector_stores.cassandra import CassandraVectorStore

        vector_store = CassandraVectorStore(
            table="cass_v_table", embedding_dimension=1536
        )
        ```

    """

    stores_text: bool = True
    flat_metadata: bool = True

    _session: Optional[Any] = PrivateAttr()
    _keyspace: Optional[Any] = PrivateAttr()
    _table: str = PrivateAttr()
    _embedding_dimension: int = PrivateAttr()
    _ttl_seconds: Optional[int] = PrivateAttr()
    _insertion_batch_size: int = PrivateAttr()
    _vector_table: ClusteredMetadataVectorCassandraTable = PrivateAttr()

    def __init__(
        self,
        table: str,
        embedding_dimension: int,
        *,
        session: Optional[Any] = None,
        keyspace: Optional[str] = None,
        ttl_seconds: Optional[int] = None,
        insertion_batch_size: int = DEFAULT_INSERTION_BATCH_SIZE,
    ) -> None:
        super().__init__()

        self._session = session
        self._keyspace = keyspace
        self._table = table
        self._embedding_dimension = embedding_dimension
        self._ttl_seconds = ttl_seconds
        self._insertion_batch_size = insertion_batch_size

        _logger.debug("Creating the Cassandra table")
        self._vector_table = ClusteredMetadataVectorCassandraTable(
            session=self._session,
            keyspace=self._keyspace,
            table=self._table,
            vector_dimension=self._embedding_dimension,
            primary_key_type=["TEXT", "TEXT"],
            # a conservative choice here, to make everything searchable
            # except the bulky "_node_content" key (it'd make little sense to):
            metadata_indexing=("default_to_searchable", ["_node_content"]),
        )

    def add(
        self,
        nodes: List[BaseNode],
        **add_kwargs: Any,
    ) -> List[str]:
        """
        Add nodes to index.

        Args:
            nodes: List[BaseNode]: list of node with embeddings

        """
        node_ids = []
        node_contents = []
        node_metadatas = []
        node_embeddings = []
        for node in nodes:
            metadata = node_to_metadata_dict(
                node,
                remove_text=True,
                flat_metadata=self.flat_metadata,
            )
            node_ids.append(node.node_id)
            node_contents.append(node.get_content(metadata_mode=MetadataMode.NONE))
            node_metadatas.append(metadata)
            node_embeddings.append(node.get_embedding())

        _logger.debug(f"Adding {len(node_ids)} rows to table")
        # Concurrent batching of inserts:
        insertion_tuples = zip(node_ids, node_contents, node_metadatas, node_embeddings)
        for insertion_batch in _batch_iterable(
            insertion_tuples, batch_size=self._insertion_batch_size
        ):
            futures = []
            for (
                node_id,
                node_content,
                node_metadata,
                node_embedding,
            ) in insertion_batch:
                node_ref_doc_id = node_metadata["ref_doc_id"]
                futures.append(
                    self._vector_table.put_async(
                        row_id=node_id,
                        body_blob=node_content,
                        vector=node_embedding,
                        metadata=node_metadata,
                        partition_id=node_ref_doc_id,
                        ttl_seconds=self._ttl_seconds,
                    )
                )
            for future in futures:
                _ = future.result()

        return node_ids

    def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
        """
        Delete nodes using with ref_doc_id.

        Args:
            ref_doc_id (str): The doc_id of the document to delete.

        """
        _logger.debug("Deleting a document from the Cassandra table")
        self._vector_table.delete_partition(
            partition_id=ref_doc_id,
        )

    @property
    def client(self) -> Any:
        """Return the underlying cassIO vector table object."""
        return self._vector_table

    @staticmethod
    def _query_filters_to_dict(query_filters: MetadataFilters) -> Dict[str, Any]:
        if any(
            not isinstance(f, ExactMatchFilter) for f in query_filters.legacy_filters()
        ):
            raise NotImplementedError("Only `ExactMatchFilter` filters are supported")
        return {f.key: f.value for f in query_filters.filters}

    def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
        """
        Query index for top k most similar nodes.

        Supported query modes: 'default' (most similar vectors) and 'mmr'.

        Args:
            query (VectorStoreQuery): the basic query definition. Defines:
                mode (VectorStoreQueryMode): one of the supported modes
                query_embedding (List[float]): query embedding to search against
                similarity_top_k (int): top k most similar nodes
                mmr_threshold (Optional[float]): this is the 0-to-1 MMR lambda.
                    If present, takes precedence over the kwargs parameter.
                    Ignored unless for MMR queries.

        Args for query.mode == 'mmr' (ignored otherwise):
            mmr_threshold (Optional[float]): this is the 0-to-1 lambda for MMR.
                Note that in principle mmr_threshold could come in the query
            mmr_prefetch_factor (Optional[float]): factor applied to top_k
                for prefetch pool size. Defaults to 4.0
            mmr_prefetch_k (Optional[int]): prefetch pool size. This cannot be
                passed together with mmr_prefetch_factor

        """
        _available_query_modes = [
            VectorStoreQueryMode.DEFAULT,
            VectorStoreQueryMode.MMR,
        ]
        if query.mode not in _available_query_modes:
            raise NotImplementedError(f"Query mode {query.mode} not available.")
        #
        query_embedding = cast(List[float], query.query_embedding)

        # metadata filtering
        if query.filters is not None:
            # raise NotImplementedError("No metadata filtering yet")
            query_metadata = self._query_filters_to_dict(query.filters)
        else:
            query_metadata = {}

        _logger.debug(
            f"Running ANN search on the Cassandra table (query mode: {query.mode})"
        )
        if query.mode == VectorStoreQueryMode.DEFAULT:
            matches = list(
                self._vector_table.metric_ann_search(
                    vector=query_embedding,
                    n=query.similarity_top_k,
                    metric="cos",
                    metric_threshold=None,
                    metadata=query_metadata,
                )
            )
            top_k_scores = [match["distance"] for match in matches]
        elif query.mode == VectorStoreQueryMode.MMR:
            # Querying a larger number of vectors and then doing MMR on them.
            if (
                kwargs.get("mmr_prefetch_factor") is not None
                and kwargs.get("mmr_prefetch_k") is not None
            ):
                raise ValueError(
                    "'mmr_prefetch_factor' and 'mmr_prefetch_k' "
                    "cannot coexist in a call to query()"
                )
            else:
                if kwargs.get("mmr_prefetch_k") is not None:
                    prefetch_k0 = int(kwargs["mmr_prefetch_k"])
                else:
                    prefetch_k0 = int(
                        query.similarity_top_k
                        * kwargs.get("mmr_prefetch_factor", DEFAULT_MMR_PREFETCH_FACTOR)
                    )
            prefetch_k = max(prefetch_k0, query.similarity_top_k)
            #
            prefetch_matches = list(
                self._vector_table.metric_ann_search(
                    vector=query_embedding,
                    n=prefetch_k,
                    metric="cos",
                    metric_threshold=None,  # this is not `mmr_threshold`
                    metadata=query_metadata,
                )
            )
            #
            mmr_threshold = query.mmr_threshold or kwargs.get("mmr_threshold")
            if prefetch_matches:
                pf_match_indices, pf_match_embeddings = zip(
                    *enumerate(match["vector"] for match in prefetch_matches)
                )
            else:
                pf_match_indices, pf_match_embeddings = [], []
            pf_match_indices = list(pf_match_indices)
            pf_match_embeddings = list(pf_match_embeddings)
            mmr_similarities, mmr_indices = get_top_k_mmr_embeddings(
                query_embedding,
                pf_match_embeddings,
                similarity_top_k=query.similarity_top_k,
                embedding_ids=pf_match_indices,
                mmr_threshold=mmr_threshold,
            )
            #
            matches = [prefetch_matches[mmr_index] for mmr_index in mmr_indices]
            top_k_scores = mmr_similarities

        top_k_nodes = []
        top_k_ids = []
        for match in matches:
            node = metadata_dict_to_node(match["metadata"])
            node.set_content(match["body_blob"])
            top_k_nodes.append(node)
            top_k_ids.append(match["row_id"])

        return VectorStoreQueryResult(
            nodes=top_k_nodes,
            similarities=top_k_scores,
            ids=top_k_ids,
        )

客户端 property #

client: Any

返回底层的cassIO向量表对象。

添加 #

add(nodes: List[BaseNode], **add_kwargs: Any) -> List[str]

将节点添加到索引。

参数:

名称 类型 描述 默认值
nodes List[BaseNode]

List[BaseNode]: 带有嵌入的节点列表

required
Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-cassandra/llama_index/vector_stores/cassandra/base.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def add(
    self,
    nodes: List[BaseNode],
    **add_kwargs: Any,
) -> List[str]:
    """
    Add nodes to index.

    Args:
        nodes: List[BaseNode]: list of node with embeddings

    """
    node_ids = []
    node_contents = []
    node_metadatas = []
    node_embeddings = []
    for node in nodes:
        metadata = node_to_metadata_dict(
            node,
            remove_text=True,
            flat_metadata=self.flat_metadata,
        )
        node_ids.append(node.node_id)
        node_contents.append(node.get_content(metadata_mode=MetadataMode.NONE))
        node_metadatas.append(metadata)
        node_embeddings.append(node.get_embedding())

    _logger.debug(f"Adding {len(node_ids)} rows to table")
    # Concurrent batching of inserts:
    insertion_tuples = zip(node_ids, node_contents, node_metadatas, node_embeddings)
    for insertion_batch in _batch_iterable(
        insertion_tuples, batch_size=self._insertion_batch_size
    ):
        futures = []
        for (
            node_id,
            node_content,
            node_metadata,
            node_embedding,
        ) in insertion_batch:
            node_ref_doc_id = node_metadata["ref_doc_id"]
            futures.append(
                self._vector_table.put_async(
                    row_id=node_id,
                    body_blob=node_content,
                    vector=node_embedding,
                    metadata=node_metadata,
                    partition_id=node_ref_doc_id,
                    ttl_seconds=self._ttl_seconds,
                )
            )
        for future in futures:
            _ = future.result()

    return node_ids

删除 #

delete(ref_doc_id: str, **delete_kwargs: Any) -> None

使用ref_doc_id删除节点。

参数:

名称 类型 描述 默认值
ref_doc_id str

要删除的文档的doc_id。

required
Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-cassandra/llama_index/vector_stores/cassandra/base.py
195
196
197
198
199
200
201
202
203
204
205
206
def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
    """
    Delete nodes using with ref_doc_id.

    Args:
        ref_doc_id (str): The doc_id of the document to delete.

    """
    _logger.debug("Deleting a document from the Cassandra table")
    self._vector_table.delete_partition(
        partition_id=ref_doc_id,
    )

查询 #

query(query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult

查询索引以获取前k个最相似的节点。

支持的查询模式:'default'(最相似向量)和'mmr'。

参数:

名称 类型 描述 默认值
query VectorStoreQuery

基本查询定义。包含以下内容: mode (VectorStoreQueryMode): 支持的查询模式之一 query_embedding (List[float]): 用于搜索的查询嵌入向量 similarity_top_k (int): 最相似的k个节点 mmr_threshold (Optional[float]): 0到1之间的MMR lambda值。 如果提供此参数,将优先于kwargs参数。 仅适用于MMR查询。

required

当query.mode == 'mmr'时的参数(其他情况下忽略): mmr_threshold (Optional[float]): 这是MMR的0到1之间的lambda值。 注意原则上mmr_threshold可以来自查询 mmr_prefetch_factor (Optional[float]): 应用于top_k的因子 用于预取池大小。默认为4.0 mmr_prefetch_k (Optional[int]): 预取池大小。该参数不能 与mmr_prefetch_factor同时传递

Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-cassandra/llama_index/vector_stores/cassandra/base.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
    """
    Query index for top k most similar nodes.

    Supported query modes: 'default' (most similar vectors) and 'mmr'.

    Args:
        query (VectorStoreQuery): the basic query definition. Defines:
            mode (VectorStoreQueryMode): one of the supported modes
            query_embedding (List[float]): query embedding to search against
            similarity_top_k (int): top k most similar nodes
            mmr_threshold (Optional[float]): this is the 0-to-1 MMR lambda.
                If present, takes precedence over the kwargs parameter.
                Ignored unless for MMR queries.

    Args for query.mode == 'mmr' (ignored otherwise):
        mmr_threshold (Optional[float]): this is the 0-to-1 lambda for MMR.
            Note that in principle mmr_threshold could come in the query
        mmr_prefetch_factor (Optional[float]): factor applied to top_k
            for prefetch pool size. Defaults to 4.0
        mmr_prefetch_k (Optional[int]): prefetch pool size. This cannot be
            passed together with mmr_prefetch_factor

    """
    _available_query_modes = [
        VectorStoreQueryMode.DEFAULT,
        VectorStoreQueryMode.MMR,
    ]
    if query.mode not in _available_query_modes:
        raise NotImplementedError(f"Query mode {query.mode} not available.")
    #
    query_embedding = cast(List[float], query.query_embedding)

    # metadata filtering
    if query.filters is not None:
        # raise NotImplementedError("No metadata filtering yet")
        query_metadata = self._query_filters_to_dict(query.filters)
    else:
        query_metadata = {}

    _logger.debug(
        f"Running ANN search on the Cassandra table (query mode: {query.mode})"
    )
    if query.mode == VectorStoreQueryMode.DEFAULT:
        matches = list(
            self._vector_table.metric_ann_search(
                vector=query_embedding,
                n=query.similarity_top_k,
                metric="cos",
                metric_threshold=None,
                metadata=query_metadata,
            )
        )
        top_k_scores = [match["distance"] for match in matches]
    elif query.mode == VectorStoreQueryMode.MMR:
        # Querying a larger number of vectors and then doing MMR on them.
        if (
            kwargs.get("mmr_prefetch_factor") is not None
            and kwargs.get("mmr_prefetch_k") is not None
        ):
            raise ValueError(
                "'mmr_prefetch_factor' and 'mmr_prefetch_k' "
                "cannot coexist in a call to query()"
            )
        else:
            if kwargs.get("mmr_prefetch_k") is not None:
                prefetch_k0 = int(kwargs["mmr_prefetch_k"])
            else:
                prefetch_k0 = int(
                    query.similarity_top_k
                    * kwargs.get("mmr_prefetch_factor", DEFAULT_MMR_PREFETCH_FACTOR)
                )
        prefetch_k = max(prefetch_k0, query.similarity_top_k)
        #
        prefetch_matches = list(
            self._vector_table.metric_ann_search(
                vector=query_embedding,
                n=prefetch_k,
                metric="cos",
                metric_threshold=None,  # this is not `mmr_threshold`
                metadata=query_metadata,
            )
        )
        #
        mmr_threshold = query.mmr_threshold or kwargs.get("mmr_threshold")
        if prefetch_matches:
            pf_match_indices, pf_match_embeddings = zip(
                *enumerate(match["vector"] for match in prefetch_matches)
            )
        else:
            pf_match_indices, pf_match_embeddings = [], []
        pf_match_indices = list(pf_match_indices)
        pf_match_embeddings = list(pf_match_embeddings)
        mmr_similarities, mmr_indices = get_top_k_mmr_embeddings(
            query_embedding,
            pf_match_embeddings,
            similarity_top_k=query.similarity_top_k,
            embedding_ids=pf_match_indices,
            mmr_threshold=mmr_threshold,
        )
        #
        matches = [prefetch_matches[mmr_index] for mmr_index in mmr_indices]
        top_k_scores = mmr_similarities

    top_k_nodes = []
    top_k_ids = []
    for match in matches:
        node = metadata_dict_to_node(match["metadata"])
        node.set_content(match["body_blob"])
        top_k_nodes.append(node)
        top_k_ids.append(match["row_id"])

    return VectorStoreQueryResult(
        nodes=top_k_nodes,
        similarities=top_k_scores,
        ids=top_k_ids,
    )