Skip to content

Redis ingestion pipeline

RedisIngestionPipelinePack #

Bases: BaseLlamaPack

Redis数据导入管道完成包。

Source code in llama_index/packs/redis_ingestion_pipeline/base.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class RedisIngestionPipelinePack(BaseLlamaPack):
    """Redis数据导入管道完成包。"""

    def __init__(
        self,
        transformations: List[TransformComponent],
        hostname: str = "localhost",
        port: int = 6379,
        cache_collection_name: str = "ingest_cache",
        vector_collection_name: str = "vector_store",
        **kwargs: Any,
    ) -> None:
        """初始化参数。"""
        self.vector_store = RedisVectorStore(
            hostname=hostname,
            port=port,
            collection_name=vector_collection_name,
        )

        self.ingest_cache = IngestionCache(
            cache=RedisCache(
                hostname=hostname,
                port=port,
            ),
            collection_name=cache_collection_name,
        )

        self.pipeline = IngestionPipeline(
            transformations=transformations,
            cache=self.ingest_cache,
            vector_store=self.vector_store,
        )

    def get_modules(self) -> Dict[str, Any]:
        """获取模块。"""
        return {
            "pipeline": self.pipeline,
            "vector_store": self.vector_store,
            "ingest_cache": self.ingest_cache,
        }

    def run(self, inputs: List[BaseNode], **kwargs: Any) -> List[BaseNode]:
        """运行流水线。"""
        return self.pipeline.run(nodes=inputs, **kwargs)

get_modules #

get_modules() -> Dict[str, Any]

获取模块。

Source code in llama_index/packs/redis_ingestion_pipeline/base.py
47
48
49
50
51
52
53
def get_modules(self) -> Dict[str, Any]:
    """获取模块。"""
    return {
        "pipeline": self.pipeline,
        "vector_store": self.vector_store,
        "ingest_cache": self.ingest_cache,
    }

run #

run(
    inputs: List[BaseNode], **kwargs: Any
) -> List[BaseNode]

运行流水线。

Source code in llama_index/packs/redis_ingestion_pipeline/base.py
55
56
57
def run(self, inputs: List[BaseNode], **kwargs: Any) -> List[BaseNode]:
    """运行流水线。"""
    return self.pipeline.run(nodes=inputs, **kwargs)