Skip to content

工作流

workflow workflow

工作流是一种简单而强大的结构,它接受一个可调用对象并返回元素。工作流与管道配合良好,但可以与任何可调用对象一起工作。工作流是流式的,能够批量处理数据,使得可以高效处理大量数据。

由于管道是可调用对象,工作流能够高效处理管道数据。大型语言模型通常处理较小批次的数据,工作流非常适合用于一系列转换器管道的输入。

以下是最基本的工作流示例:

workflow = Workflow([Task(lambda x: [y * 2 for y in x])])
list(workflow([1, 2, 3]))

此示例将每个输入值乘以2,并通过生成器返回转换后的元素。

由于工作流以生成器的形式运行,必须消费输出才能执行。以下代码段展示了如何消费输出。

# 小型数据集,输出适合在内存中
list(workflow(elements))

# 大型数据集
for output in workflow(elements):
    function(output)

# 丢弃输出的大型数据集
for _ in workflow(elements):
    pass

工作流可以使用Python或配置运行。下面展示了这两种方法的示例。

示例

以下是一个完整的Python示例。此工作流转录一组音频文件,将文本翻译成法语并索引数据。

from txtai import Embeddings
from txtai.pipeline import Transcription, Translation
from txtai.workflow import FileTask, Task, Workflow

# Embeddings实例
embeddings = Embeddings({
    "path": "sentence-transformers/paraphrase-MiniLM-L3-v2",
    "content": True
})

# 转录实例
transcribe = Transcription()

# 翻译实例
translate = Translation()

tasks = [
    FileTask(transcribe, r"\.wav$"),
    Task(lambda x: translate(x, "fr"))
]

# 要处理的文件列表
data = [
  "US_tops_5_million.wav",
  "Canadas_last_fully.wav",
  "Beijing_mobilises.wav",
  "The_National_Park.wav",
  "Maine_man_wins_1_mil.wav",
  "Make_huge_profits.wav"
]

# 将文本翻译成法语的工作流
workflow = Workflow(tasks)

# 索引数据
embeddings.index((uid, text, None) for uid, text in enumerate(workflow(data)))

# 搜索
embeddings.search("wildlife", 1)

配置驱动示例

工作流也可以使用YAML配置定义。

writable: true
embeddings:
  path: sentence-transformers/paraphrase-MiniLM-L3-v2
  content: true

# 将音频转录为文本
transcription:

# 在语言间翻译文本
translation:

workflow:
  index:
    tasks:
      - action: transcription
        select: "\\.wav$"
        task: file
      - action: translation
        args: ["fr"]
      - action: index
# 创建并运行工作流
from txtai import Application

# 创建并运行工作流
app = Application("workflow.yml")
list(app.workflow("index", [
  "US_tops_5_million.wav",
  "Canadas_last_fully.wav",
  "Beijing_mobilises.wav",
  "The_National_Park.wav",
  "Maine_man_wins_1_mil.wav",
  "Make_huge_profits.wav"
]))

# 搜索
app.search("wildlife")

上述代码执行了文件workflow.yml中定义的工作流。

LLM工作流示例

工作流可以将多个LLM提示任务连接在一起。

llm:
  path: google/flan-t5-xl

workflow:
  llm:
    tasks:
      - task: template
        template: |
          为以下文本提取关键词。

          {text}
        action: llm
      - task: template
        template: |
          将以下文本翻译成法语。

          {text}
        action: llm
from txtai import Application

app = Application("workflow.yml")
list(app.workflow("llm", [
  """
  txtai 是一个开源平台,用于语义搜索
  和由语言模型驱动的工作流。
  """
]))

任何txtai管道/工作流任务都可以在包含LLM的工作流中连接。

llm:
  path: google/flan-t5-xl

translation:

workflow:
  llm:
    tasks:
      - task: template
        template: |
          为以下文本提取关键词。

          {text}
        action: llm
      - action: translation
        args:
          - fr

有关更多信息,请参见以下链接。

方法

工作流是可调用对象。工作流接受可迭代数据元素的输入,并输出可迭代数据元素。

__init__(tasks, batch=100, workers=None, name=None, stream=None)

Creates a new workflow. Workflows are lists of tasks to execute.

Parameters:

Name Type Description Default
tasks

list of workflow tasks

required
batch

how many items to process at a time, defaults to 100

100
workers

number of concurrent workers

None
name

workflow name

None
stream

workflow stream processor

None
Source code in txtai/workflow/base.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(self, tasks, batch=100, workers=None, name=None, stream=None):
    """
    Creates a new workflow. Workflows are lists of tasks to execute.

    Args:
        tasks: list of workflow tasks
        batch: how many items to process at a time, defaults to 100
        workers: number of concurrent workers
        name: workflow name
        stream: workflow stream processor
    """

    self.tasks = tasks
    self.batch = batch
    self.workers = workers
    self.name = name
    self.stream = stream

    # Set default number of executor workers to max number of actions in a task
    self.workers = max(len(task.action) for task in self.tasks) if not self.workers else self.workers

__call__(elements)

Executes a workflow for input elements. This method returns a generator that yields transformed data elements.

Parameters:

Name Type Description Default
elements

iterable data elements

required

Returns:

Type Description

generator that yields transformed data elements

Source code in txtai/workflow/base.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def __call__(self, elements):
    """
    Executes a workflow for input elements. This method returns a generator that yields transformed
    data elements.

    Args:
        elements: iterable data elements

    Returns:
        generator that yields transformed data elements
    """

    # Create execute instance for this run
    with Execute(self.workers) as executor:
        # Run task initializers
        self.initialize()

        # Process elements with stream processor, if available
        elements = self.stream(elements) if self.stream else elements

        # Process elements in batches
        for batch in self.chunk(elements):
            yield from self.process(batch, executor)

        # Run task finalizers
        self.finalize()

schedule(cron, elements, iterations=None)

Schedules a workflow using a cron expression and elements.

Parameters:

Name Type Description Default
cron

cron expression

required
elements

iterable data elements passed to workflow each call

required
iterations

number of times to run workflow, defaults to run indefinitely

None
Source code in txtai/workflow/base.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def schedule(self, cron, elements, iterations=None):
    """
    Schedules a workflow using a cron expression and elements.

    Args:
        cron: cron expression
        elements: iterable data elements passed to workflow each call
        iterations: number of times to run workflow, defaults to run indefinitely
    """

    # Check that croniter is installed
    if not CRONITER:
        raise ImportError('Workflow scheduling is not available - install "workflow" extra to enable')

    logger.info("'%s' scheduler started with schedule %s", self.name, cron)

    maxiterations = iterations
    while iterations is None or iterations > 0:
        # Schedule using localtime
        schedule = croniter(cron, datetime.now().astimezone()).get_next(datetime)
        logger.info("'%s' next run scheduled for %s", self.name, schedule.isoformat())
        time.sleep(schedule.timestamp() - time.time())

        # Run workflow
        # pylint: disable=W0703
        try:
            for _ in self(elements):
                pass
        except Exception:
            logger.error(traceback.format_exc())

        # Decrement iterations remaining, if necessary
        if iterations is not None:
            iterations -= 1

    logger.info("'%s' max iterations (%d) reached", self.name, maxiterations)

更多示例

请参阅此链接以获取完整的工作流示例列表。