Skip to content

Cassandra

CassandraVectorStore #

Bases: BasePydanticVectorStore

    # Cassandra向量存储。

    # 一个具有向量相似性搜索的Cassandra表的抽象。文档及其嵌入被存储在Cassandra表中,并且用于搜索的是一个支持向量的索引。
    # 表不需要事先存在:如果需要,它将在后台被创建。

    # 所有的Cassandra操作都是通过CassIO库完成的。

    # 注意:在最近的版本中,只能通过位置传递`table`和`embedding_dimension`。如果需要,请修改您的代码。
    # 这是为了适应更简洁的用法,通过`cassio.init(...)`调用全局设置DB连接:然后,在创建向量存储时不再需要指定DB详细信息,除非需要。

    # Args:
    #     table (str): 要使用的表名。如果不存在,将被创建。
    #     embedding_dimension (int): 使用的嵌入向量的长度。
    #     session (可选, cassandra.cluster.Session): 要使用的Cassandra会话。
    #         可以省略,或者等效地设置为None,以使用之前通过cassio.init()全局设置的DB连接。
    #     keyspace (可选. str): 要工作的Cassandra keyspace的名称。
    #         可以省略,或者等效地设置为None,以使用之前通过cassio.init()全局设置的DB连接。
    #     ttl_seconds (可选, int): 插入条目的过期时间。默认为不过期(None)。
    #     insertion_batch_size (可选, int): 并发插入多少向量,用于批量插入。默认为20。

    # 示例:
    #     `pip install llama-index-vector-stores-cassandra`

    #     ```python
    #     from llama_index.vector_stores.cassandra import CassandraVectorStore

    #     vector_store = CassandraVectorStore(
    #         table="cass_v_table", embedding_dimension=1536
    #     )
    #     ```
Source code in llama_index/vector_stores/cassandra/base.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
class CassandraVectorStore(BasePydanticVectorStore):
    """```python
    # Cassandra向量存储。

    # 一个具有向量相似性搜索的Cassandra表的抽象。文档及其嵌入被存储在Cassandra表中,并且用于搜索的是一个支持向量的索引。
    # 表不需要事先存在:如果需要,它将在后台被创建。

    # 所有的Cassandra操作都是通过CassIO库完成的。

    # 注意:在最近的版本中,只能通过位置传递`table`和`embedding_dimension`。如果需要,请修改您的代码。
    # 这是为了适应更简洁的用法,通过`cassio.init(...)`调用全局设置DB连接:然后,在创建向量存储时不再需要指定DB详细信息,除非需要。

    # Args:
    #     table (str): 要使用的表名。如果不存在,将被创建。
    #     embedding_dimension (int): 使用的嵌入向量的长度。
    #     session (可选, cassandra.cluster.Session): 要使用的Cassandra会话。
    #         可以省略,或者等效地设置为None,以使用之前通过cassio.init()全局设置的DB连接。
    #     keyspace (可选. str): 要工作的Cassandra keyspace的名称。
    #         可以省略,或者等效地设置为None,以使用之前通过cassio.init()全局设置的DB连接。
    #     ttl_seconds (可选, int): 插入条目的过期时间。默认为不过期(None)。
    #     insertion_batch_size (可选, int): 并发插入多少向量,用于批量插入。默认为20。

    # 示例:
    #     `pip install llama-index-vector-stores-cassandra`

    #     ```python
    #     from llama_index.vector_stores.cassandra import CassandraVectorStore

    #     vector_store = CassandraVectorStore(
    #         table="cass_v_table", embedding_dimension=1536
    #     )
    #     ```
```"""

    stores_text: bool = True
    flat_metadata: bool = True

    _session: Optional[Any] = PrivateAttr()
    _keyspace: Optional[Any] = PrivateAttr()
    _table: str = PrivateAttr()
    _embedding_dimension: int = PrivateAttr()
    _ttl_seconds: Optional[int] = PrivateAttr()
    _insertion_batch_size: int = PrivateAttr()
    _vector_table: ClusteredMetadataVectorCassandraTable = PrivateAttr()

    def __init__(
        self,
        table: str,
        embedding_dimension: int,
        *,
        session: Optional[Any] = None,
        keyspace: Optional[str] = None,
        ttl_seconds: Optional[int] = None,
        insertion_batch_size: int = DEFAULT_INSERTION_BATCH_SIZE,
    ) -> None:
        super().__init__()

        self._session = session
        self._keyspace = keyspace
        self._table = table
        self._embedding_dimension = embedding_dimension
        self._ttl_seconds = ttl_seconds
        self._insertion_batch_size = insertion_batch_size

        _logger.debug("Creating the Cassandra table")
        self._vector_table = ClusteredMetadataVectorCassandraTable(
            session=self._session,
            keyspace=self._keyspace,
            table=self._table,
            vector_dimension=self._embedding_dimension,
            primary_key_type=["TEXT", "TEXT"],
            # a conservative choice here, to make everything searchable
            # except the bulky "_node_content" key (it'd make little sense to):
            metadata_indexing=("default_to_searchable", ["_node_content"]),
        )

    def add(
        self,
        nodes: List[BaseNode],
        **add_kwargs: Any,
    ) -> List[str]:
        """将节点添加到索引中。

Args:
    节点:List[BaseNode]:带有嵌入的节点列表
"""
        node_ids = []
        node_contents = []
        node_metadatas = []
        node_embeddings = []
        for node in nodes:
            metadata = node_to_metadata_dict(
                node,
                remove_text=True,
                flat_metadata=self.flat_metadata,
            )
            node_ids.append(node.node_id)
            node_contents.append(node.get_content(metadata_mode=MetadataMode.NONE))
            node_metadatas.append(metadata)
            node_embeddings.append(node.get_embedding())

        _logger.debug(f"Adding {len(node_ids)} rows to table")
        # Concurrent batching of inserts:
        insertion_tuples = zip(node_ids, node_contents, node_metadatas, node_embeddings)
        for insertion_batch in _batch_iterable(
            insertion_tuples, batch_size=self._insertion_batch_size
        ):
            futures = []
            for (
                node_id,
                node_content,
                node_metadata,
                node_embedding,
            ) in insertion_batch:
                node_ref_doc_id = node_metadata["ref_doc_id"]
                futures.append(
                    self._vector_table.put_async(
                        row_id=node_id,
                        body_blob=node_content,
                        vector=node_embedding,
                        metadata=node_metadata,
                        partition_id=node_ref_doc_id,
                        ttl_seconds=self._ttl_seconds,
                    )
                )
            for future in futures:
                _ = future.result()

        return node_ids

    def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
        """使用ref_doc_id删除节点。

Args:
    ref_doc_id(str):要删除的文档的doc_id。
"""
        _logger.debug("Deleting a document from the Cassandra table")
        self._vector_table.delete_partition(
            partition_id=ref_doc_id,
        )

    @property
    def client(self) -> Any:
        """返回底层的cassIO向量表对象。"""
        return self._vector_table

    @staticmethod
    def _query_filters_to_dict(query_filters: MetadataFilters) -> Dict[str, Any]:
        if any(
            not isinstance(f, ExactMatchFilter) for f in query_filters.legacy_filters()
        ):
            raise NotImplementedError("Only `ExactMatchFilter` filters are supported")
        return {f.key: f.value for f in query_filters.filters}

    def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
        """查询索引以获取前k个最相似的节点。

支持的查询模式:'default'(最相似的向量)和'mmr'。

Args:
    query(VectorStoreQuery):基本查询定义。定义:
        mode(VectorStoreQueryMode):支持的模式之一
        query_embedding(List[float]):要搜索的查询嵌入
        similarity_top_k(int):前k个最相似的节点
        mmr_threshold(Optional[float]):这是0到1的MMR lambda。
            如果存在,则优先于kwargs参数。
            除非用于MMR查询,否则将被忽略。

用于query.mode == 'mmr'的参数(否则将被忽略):
    mmr_threshold(Optional[float]):这是MMR的0到1的lambda。
        请注意,原则上mmr_threshold可以出现在查询中
    mmr_prefetch_factor(Optional[float]):应用于prefetch池大小的因子。默认为4.0
    mmr_prefetch_k(Optional[int]):预取池大小。不能与mmr_prefetch_factor一起传递。
"""
        _available_query_modes = [
            VectorStoreQueryMode.DEFAULT,
            VectorStoreQueryMode.MMR,
        ]
        if query.mode not in _available_query_modes:
            raise NotImplementedError(f"Query mode {query.mode} not available.")
        #
        query_embedding = cast(List[float], query.query_embedding)

        # metadata filtering
        if query.filters is not None:
            # raise NotImplementedError("No metadata filtering yet")
            query_metadata = self._query_filters_to_dict(query.filters)
        else:
            query_metadata = {}

        _logger.debug(
            f"Running ANN search on the Cassandra table (query mode: {query.mode})"
        )
        if query.mode == VectorStoreQueryMode.DEFAULT:
            matches = list(
                self._vector_table.metric_ann_search(
                    vector=query_embedding,
                    n=query.similarity_top_k,
                    metric="cos",
                    metric_threshold=None,
                    metadata=query_metadata,
                )
            )
            top_k_scores = [match["distance"] for match in matches]
        elif query.mode == VectorStoreQueryMode.MMR:
            # Querying a larger number of vectors and then doing MMR on them.
            if (
                kwargs.get("mmr_prefetch_factor") is not None
                and kwargs.get("mmr_prefetch_k") is not None
            ):
                raise ValueError(
                    "'mmr_prefetch_factor' and 'mmr_prefetch_k' "
                    "cannot coexist in a call to query()"
                )
            else:
                if kwargs.get("mmr_prefetch_k") is not None:
                    prefetch_k0 = int(kwargs["mmr_prefetch_k"])
                else:
                    prefetch_k0 = int(
                        query.similarity_top_k
                        * kwargs.get("mmr_prefetch_factor", DEFAULT_MMR_PREFETCH_FACTOR)
                    )
            prefetch_k = max(prefetch_k0, query.similarity_top_k)
            #
            prefetch_matches = list(
                self._vector_table.metric_ann_search(
                    vector=query_embedding,
                    n=prefetch_k,
                    metric="cos",
                    metric_threshold=None,  # this is not `mmr_threshold`
                    metadata=query_metadata,
                )
            )
            #
            mmr_threshold = query.mmr_threshold or kwargs.get("mmr_threshold")
            if prefetch_matches:
                pf_match_indices, pf_match_embeddings = zip(
                    *enumerate(match["vector"] for match in prefetch_matches)
                )
            else:
                pf_match_indices, pf_match_embeddings = [], []
            pf_match_indices = list(pf_match_indices)
            pf_match_embeddings = list(pf_match_embeddings)
            mmr_similarities, mmr_indices = get_top_k_mmr_embeddings(
                query_embedding,
                pf_match_embeddings,
                similarity_top_k=query.similarity_top_k,
                embedding_ids=pf_match_indices,
                mmr_threshold=mmr_threshold,
            )
            #
            matches = [prefetch_matches[mmr_index] for mmr_index in mmr_indices]
            top_k_scores = mmr_similarities

        top_k_nodes = []
        top_k_ids = []
        for match in matches:
            node = metadata_dict_to_node(match["metadata"])
            node.set_content(match["body_blob"])
            top_k_nodes.append(node)
            top_k_ids.append(match["row_id"])

        return VectorStoreQueryResult(
            nodes=top_k_nodes,
            similarities=top_k_scores,
            ids=top_k_ids,
        )

client property #

client: Any

返回底层的cassIO向量表对象。

add #

add(nodes: List[BaseNode], **add_kwargs: Any) -> List[str]

将节点添加到索引中。

Source code in llama_index/vector_stores/cassandra/base.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
    def add(
        self,
        nodes: List[BaseNode],
        **add_kwargs: Any,
    ) -> List[str]:
        """将节点添加到索引中。

Args:
    节点:List[BaseNode]:带有嵌入的节点列表
"""
        node_ids = []
        node_contents = []
        node_metadatas = []
        node_embeddings = []
        for node in nodes:
            metadata = node_to_metadata_dict(
                node,
                remove_text=True,
                flat_metadata=self.flat_metadata,
            )
            node_ids.append(node.node_id)
            node_contents.append(node.get_content(metadata_mode=MetadataMode.NONE))
            node_metadatas.append(metadata)
            node_embeddings.append(node.get_embedding())

        _logger.debug(f"Adding {len(node_ids)} rows to table")
        # Concurrent batching of inserts:
        insertion_tuples = zip(node_ids, node_contents, node_metadatas, node_embeddings)
        for insertion_batch in _batch_iterable(
            insertion_tuples, batch_size=self._insertion_batch_size
        ):
            futures = []
            for (
                node_id,
                node_content,
                node_metadata,
                node_embedding,
            ) in insertion_batch:
                node_ref_doc_id = node_metadata["ref_doc_id"]
                futures.append(
                    self._vector_table.put_async(
                        row_id=node_id,
                        body_blob=node_content,
                        vector=node_embedding,
                        metadata=node_metadata,
                        partition_id=node_ref_doc_id,
                        ttl_seconds=self._ttl_seconds,
                    )
                )
            for future in futures:
                _ = future.result()

        return node_ids

delete #

delete(ref_doc_id: str, **delete_kwargs: Any) -> None

使用ref_doc_id删除节点。

Source code in llama_index/vector_stores/cassandra/base.py
178
179
180
181
182
183
184
185
186
187
    def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
        """使用ref_doc_id删除节点。

Args:
    ref_doc_id(str):要删除的文档的doc_id。
"""
        _logger.debug("Deleting a document from the Cassandra table")
        self._vector_table.delete_partition(
            partition_id=ref_doc_id,
        )

query #

query(
    query: VectorStoreQuery, **kwargs: Any
) -> VectorStoreQueryResult

查询索引以获取前k个最相似的节点。

支持的查询模式:'default'(最相似的向量)和'mmr'。

用于query.mode == 'mmr'的参数(否则将被忽略): mmr_threshold(Optional[float]):这是MMR的0到1的lambda。 请注意,原则上mmr_threshold可以出现在查询中 mmr_prefetch_factor(Optional[float]):应用于prefetch池大小的因子。默认为4.0 mmr_prefetch_k(Optional[int]):预取池大小。不能与mmr_prefetch_factor一起传递。

Source code in llama_index/vector_stores/cassandra/base.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
    def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
        """查询索引以获取前k个最相似的节点。

支持的查询模式:'default'(最相似的向量)和'mmr'。

Args:
    query(VectorStoreQuery):基本查询定义。定义:
        mode(VectorStoreQueryMode):支持的模式之一
        query_embedding(List[float]):要搜索的查询嵌入
        similarity_top_k(int):前k个最相似的节点
        mmr_threshold(Optional[float]):这是0到1的MMR lambda。
            如果存在,则优先于kwargs参数。
            除非用于MMR查询,否则将被忽略。

用于query.mode == 'mmr'的参数(否则将被忽略):
    mmr_threshold(Optional[float]):这是MMR的0到1的lambda。
        请注意,原则上mmr_threshold可以出现在查询中
    mmr_prefetch_factor(Optional[float]):应用于prefetch池大小的因子。默认为4.0
    mmr_prefetch_k(Optional[int]):预取池大小。不能与mmr_prefetch_factor一起传递。
"""
        _available_query_modes = [
            VectorStoreQueryMode.DEFAULT,
            VectorStoreQueryMode.MMR,
        ]
        if query.mode not in _available_query_modes:
            raise NotImplementedError(f"Query mode {query.mode} not available.")
        #
        query_embedding = cast(List[float], query.query_embedding)

        # metadata filtering
        if query.filters is not None:
            # raise NotImplementedError("No metadata filtering yet")
            query_metadata = self._query_filters_to_dict(query.filters)
        else:
            query_metadata = {}

        _logger.debug(
            f"Running ANN search on the Cassandra table (query mode: {query.mode})"
        )
        if query.mode == VectorStoreQueryMode.DEFAULT:
            matches = list(
                self._vector_table.metric_ann_search(
                    vector=query_embedding,
                    n=query.similarity_top_k,
                    metric="cos",
                    metric_threshold=None,
                    metadata=query_metadata,
                )
            )
            top_k_scores = [match["distance"] for match in matches]
        elif query.mode == VectorStoreQueryMode.MMR:
            # Querying a larger number of vectors and then doing MMR on them.
            if (
                kwargs.get("mmr_prefetch_factor") is not None
                and kwargs.get("mmr_prefetch_k") is not None
            ):
                raise ValueError(
                    "'mmr_prefetch_factor' and 'mmr_prefetch_k' "
                    "cannot coexist in a call to query()"
                )
            else:
                if kwargs.get("mmr_prefetch_k") is not None:
                    prefetch_k0 = int(kwargs["mmr_prefetch_k"])
                else:
                    prefetch_k0 = int(
                        query.similarity_top_k
                        * kwargs.get("mmr_prefetch_factor", DEFAULT_MMR_PREFETCH_FACTOR)
                    )
            prefetch_k = max(prefetch_k0, query.similarity_top_k)
            #
            prefetch_matches = list(
                self._vector_table.metric_ann_search(
                    vector=query_embedding,
                    n=prefetch_k,
                    metric="cos",
                    metric_threshold=None,  # this is not `mmr_threshold`
                    metadata=query_metadata,
                )
            )
            #
            mmr_threshold = query.mmr_threshold or kwargs.get("mmr_threshold")
            if prefetch_matches:
                pf_match_indices, pf_match_embeddings = zip(
                    *enumerate(match["vector"] for match in prefetch_matches)
                )
            else:
                pf_match_indices, pf_match_embeddings = [], []
            pf_match_indices = list(pf_match_indices)
            pf_match_embeddings = list(pf_match_embeddings)
            mmr_similarities, mmr_indices = get_top_k_mmr_embeddings(
                query_embedding,
                pf_match_embeddings,
                similarity_top_k=query.similarity_top_k,
                embedding_ids=pf_match_indices,
                mmr_threshold=mmr_threshold,
            )
            #
            matches = [prefetch_matches[mmr_index] for mmr_index in mmr_indices]
            top_k_scores = mmr_similarities

        top_k_nodes = []
        top_k_ids = []
        for match in matches:
            node = metadata_dict_to_node(match["metadata"])
            node.set_content(match["body_blob"])
            top_k_nodes.append(node)
            top_k_ids.append(match["row_id"])

        return VectorStoreQueryResult(
            nodes=top_k_nodes,
            similarities=top_k_scores,
            ids=top_k_ids,
        )