Skip to content

Elasticsearch

ElasticsearchKVStore #

Bases: BaseKVStore

Elasticsearch键值存储。

Parameters:

Name Type Description Default
index_name str

Elasticsearch索引的名称。

required
es_client Optional[Any]

可选。预先存在的AsyncElasticsearch客户端。

required
es_url Optional[str]

可选。Elasticsearch URL。

None
es_cloud_id Optional[str]

可选。Elasticsearch云ID。

None
es_api_key Optional[str]

可选。Elasticsearch API密钥。

None
es_user Optional[str]

可选。Elasticsearch用户名。

None
es_password Optional[str]

可选。Elasticsearch密码。

None

Raises:

Type Description
ConnectionError

如果AsyncElasticsearch客户端无法连接到Elasticsearch。

ValueError

如果未提供es_client、es_url或es_cloud_id。

Source code in llama_index/storage/kvstore/elasticsearch/base.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
class ElasticsearchKVStore(BaseKVStore):
    """Elasticsearch键值存储。

    Args:
        index_name: Elasticsearch索引的名称。
        es_client: 可选。预先存在的AsyncElasticsearch客户端。
        es_url: 可选。Elasticsearch URL。
        es_cloud_id: 可选。Elasticsearch云ID。
        es_api_key: 可选。Elasticsearch API密钥。
        es_user: 可选。Elasticsearch用户名。
        es_password: 可选。Elasticsearch密码。


    Raises:
        ConnectionError: 如果AsyncElasticsearch客户端无法连接到Elasticsearch。
        ValueError: 如果未提供es_client、es_url或es_cloud_id。"""

    es_client: Optional[Any]
    es_url: Optional[str]
    es_cloud_id: Optional[str]
    es_api_key: Optional[str]
    es_user: Optional[str]
    es_password: Optional[str]

    def __init__(
        self,
        index_name: str,
        es_client: Optional[Any],
        es_url: Optional[str] = None,
        es_cloud_id: Optional[str] = None,
        es_api_key: Optional[str] = None,
        es_user: Optional[str] = None,
        es_password: Optional[str] = None,
    ) -> None:
        nest_asyncio.apply()

        """Init a ElasticsearchKVStore."""
        try:
            from elasticsearch import AsyncElasticsearch
        except ImportError:
            raise ImportError(IMPORT_ERROR_MSG)

        if es_client is not None:
            self._client = es_client.options(
                headers={"user-agent": self.get_user_agent()}
            )
        elif es_url is not None or es_cloud_id is not None:
            self._client: AsyncElasticsearch = _get_elasticsearch_client(
                es_url=es_url,
                username=es_user,
                password=es_password,
                cloud_id=es_cloud_id,
                api_key=es_api_key,
            )
        else:
            raise ValueError(
                """Either provide a pre-existing AsyncElasticsearch or valid \
                credentials for creating a new connection."""
            )

    @property
    def client(self) -> Any:
        """获取异步的elasticsearch客户端。"""
        return self._client

    @staticmethod
    def get_user_agent() -> str:
        """获取用于elasticsearch客户端的用户代理。"""
        return "llama_index-py-vs"

    async def _create_index_if_not_exists(self, index_name: str) -> None:
        """创建AsyncElasticsearch索引(如果尚不存在)。

Args:
    index_name:要创建的AsyncElasticsearch索引的名称。
"""
        if await self.client.indices.exists(index=index_name):
            logger.debug(f"Index {index_name} already exists. Skipping creation.")

        else:
            index_settings = {"mappings": {"_source": {"enabled": True}}}

            logger.debug(
                f"Creating index {index_name} with mappings {index_settings['mappings']}"
            )
            await self.client.indices.create(index=index_name, **index_settings)

    def put(
        self,
        key: str,
        val: dict,
        collection: str = DEFAULT_COLLECTION,
    ) -> None:
        """将一个键值对放入存储中。

Args:
    key(str):键
    val(dict):值
    collection(str):集合名称
"""
        self.put_all([(key, val)], collection=collection)

    async def aput(
        self,
        key: str,
        val: dict,
        collection: str = DEFAULT_COLLECTION,
    ) -> None:
        """将一个键值对放入存储中。

Args:
    key(str):键
    val(dict):值
    collection(str):集合名称
"""
        await self.aput_all([(key, val)], collection=collection)

    def put_all(
        self,
        kv_pairs: List[Tuple[str, dict]],
        collection: str = DEFAULT_COLLECTION,
        batch_size: int = DEFAULT_BATCH_SIZE,
    ) -> None:
        return asyncio.get_event_loop().run_until_complete(
            self.aput_all(kv_pairs, collection, batch_size)
        )

    async def aput_all(
        self,
        kv_pairs: List[Tuple[str, dict]],
        collection: str = DEFAULT_COLLECTION,
        batch_size: int = DEFAULT_BATCH_SIZE,
    ) -> None:
        await self._create_index_if_not_exists(collection)

        # Prepare documents with '_id' set to the key for batch insertion
        docs = [{"_id": key, **value} for key, value in kv_pairs]

        # Insert documents in batches
        for batch in (
            docs[i : i + batch_size] for i in range(0, len(docs), batch_size)
        ):
            requests = []
            for doc in batch:
                doc_id = doc["_id"]
                doc.pop("_id")
                logger.debug(doc)
                request = {
                    "_op_type": "index",
                    "_index": collection,
                    **doc,
                    "_id": doc_id,
                }
                requests.append(request)
            await async_bulk(self.client, requests, chunk_size=batch_size, refresh=True)

    def get(self, key: str, collection: str = DEFAULT_COLLECTION) -> Optional[dict]:
        """从存储中获取一个值。

Args:
    key(str):键
    collection(str):集合名称
"""
        return asyncio.get_event_loop().run_until_complete(self.aget(key, collection))

    async def aget(
        self, key: str, collection: str = DEFAULT_COLLECTION
    ) -> Optional[dict]:
        """Get a value from the store.

        Args:
            key (str): key
            collection (str): collection name

        """
        await self._create_index_if_not_exists(collection)

        try:
            response = await self._client.get(index=collection, id=key, source=True)
            return response.body["_source"]
        except elasticsearch.NotFoundError:
            return None

    def get_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """从商店中获取所有的数值。

Args:
    collection (str): 集合名称
"""
        return asyncio.get_event_loop().run_until_complete(self.aget_all(collection))

    async def aget_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """从商店中获取所有的数值。

Args:
    collection (str): 集合名称
"""
        await self._create_index_if_not_exists(collection)

        q = {"query": {"match_all": {}}}
        response = await self._client.search(index=collection, body=q, source=True)
        result = {}
        for r in response["hits"]["hits"]:
            doc_id = r["_id"]
            content = r["_source"]
            result[doc_id] = content
        return result

    def delete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """从存储中删除一个值。

Args:
    key (str): 键
    collection (str): 集合名称
"""
        return asyncio.get_event_loop().run_until_complete(
            self.adelete(key, collection)
        )

    async def adelete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """从存储中删除一个值。

Args:
    key (str): 键
    collection (str): 集合名称
"""
        await self._create_index_if_not_exists(collection)

        try:
            response = await self._client.delete(index=collection, id=key)
            return response.body["result"] == "deleted"
        except elasticsearch.NotFoundError:
            return False

client property #

client: Any

获取异步的elasticsearch客户端。

get_user_agent staticmethod #

get_user_agent() -> str

获取用于elasticsearch客户端的用户代理。

Source code in llama_index/storage/kvstore/elasticsearch/base.py
143
144
145
146
@staticmethod
def get_user_agent() -> str:
    """获取用于elasticsearch客户端的用户代理。"""
    return "llama_index-py-vs"

put #

put(
    key: str,
    val: dict,
    collection: str = DEFAULT_COLLECTION,
) -> None

将一个键值对放入存储中。

Source code in llama_index/storage/kvstore/elasticsearch/base.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
    def put(
        self,
        key: str,
        val: dict,
        collection: str = DEFAULT_COLLECTION,
    ) -> None:
        """将一个键值对放入存储中。

Args:
    key(str):键
    val(dict):值
    collection(str):集合名称
"""
        self.put_all([(key, val)], collection=collection)

aput async #

aput(
    key: str,
    val: dict,
    collection: str = DEFAULT_COLLECTION,
) -> None

将一个键值对放入存储中。

Source code in llama_index/storage/kvstore/elasticsearch/base.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
    async def aput(
        self,
        key: str,
        val: dict,
        collection: str = DEFAULT_COLLECTION,
    ) -> None:
        """将一个键值对放入存储中。

Args:
    key(str):键
    val(dict):值
    collection(str):集合名称
"""
        await self.aput_all([(key, val)], collection=collection)

get #

get(
    key: str, collection: str = DEFAULT_COLLECTION
) -> Optional[dict]

从存储中获取一个值。

Source code in llama_index/storage/kvstore/elasticsearch/base.py
234
235
236
237
238
239
240
241
    def get(self, key: str, collection: str = DEFAULT_COLLECTION) -> Optional[dict]:
        """从存储中获取一个值。

Args:
    key(str):键
    collection(str):集合名称
"""
        return asyncio.get_event_loop().run_until_complete(self.aget(key, collection))

aget async #

aget(
    key: str, collection: str = DEFAULT_COLLECTION
) -> Optional[dict]

Get a value from the store.

Parameters:

Name Type Description Default
key str

key

required
collection str

collection name

DEFAULT_COLLECTION
Source code in llama_index/storage/kvstore/elasticsearch/base.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
async def aget(
    self, key: str, collection: str = DEFAULT_COLLECTION
) -> Optional[dict]:
    """Get a value from the store.

    Args:
        key (str): key
        collection (str): collection name

    """
    await self._create_index_if_not_exists(collection)

    try:
        response = await self._client.get(index=collection, id=key, source=True)
        return response.body["_source"]
    except elasticsearch.NotFoundError:
        return None

get_all #

get_all(
    collection: str = DEFAULT_COLLECTION,
) -> Dict[str, dict]

从商店中获取所有的数值。

Parameters:

Name Type Description Default
collection str

集合名称

DEFAULT_COLLECTION
Source code in llama_index/storage/kvstore/elasticsearch/base.py
261
262
263
264
265
266
267
    def get_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """从商店中获取所有的数值。

Args:
    collection (str): 集合名称
"""
        return asyncio.get_event_loop().run_until_complete(self.aget_all(collection))

aget_all async #

aget_all(
    collection: str = DEFAULT_COLLECTION,
) -> Dict[str, dict]

从商店中获取所有的数值。

Parameters:

Name Type Description Default
collection str

集合名称

DEFAULT_COLLECTION
Source code in llama_index/storage/kvstore/elasticsearch/base.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
    async def aget_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """从商店中获取所有的数值。

Args:
    collection (str): 集合名称
"""
        await self._create_index_if_not_exists(collection)

        q = {"query": {"match_all": {}}}
        response = await self._client.search(index=collection, body=q, source=True)
        result = {}
        for r in response["hits"]["hits"]:
            doc_id = r["_id"]
            content = r["_source"]
            result[doc_id] = content
        return result

delete #

delete(
    key: str, collection: str = DEFAULT_COLLECTION
) -> bool

从存储中删除一个值。

Parameters:

Name Type Description Default
key str

required
collection str

集合名称

DEFAULT_COLLECTION
Source code in llama_index/storage/kvstore/elasticsearch/base.py
286
287
288
289
290
291
292
293
294
295
    def delete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """从存储中删除一个值。

Args:
    key (str): 键
    collection (str): 集合名称
"""
        return asyncio.get_event_loop().run_until_complete(
            self.adelete(key, collection)
        )

adelete async #

adelete(
    key: str, collection: str = DEFAULT_COLLECTION
) -> bool

从存储中删除一个值。

Parameters:

Name Type Description Default
key str

required
collection str

集合名称

DEFAULT_COLLECTION
Source code in llama_index/storage/kvstore/elasticsearch/base.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
    async def adelete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """从存储中删除一个值。

Args:
    key (str): 键
    collection (str): 集合名称
"""
        await self._create_index_if_not_exists(collection)

        try:
            response = await self._client.delete(index=collection, id=key)
            return response.body["result"] == "deleted"
        except elasticsearch.NotFoundError:
            return False