工作流
工作流是一种简单而强大的结构,它接受一个可调用对象并返回元素。工作流与管道配合良好,但可以与任何可调用对象一起工作。工作流是流式的,能够批量处理数据,使得可以高效处理大量数据。
由于管道是可调用对象,工作流能够高效处理管道数据。大型语言模型通常处理较小批次的数据,工作流非常适合用于一系列转换器管道的输入。
以下是最基本的工作流示例:
workflow = Workflow([Task(lambda x: [y * 2 for y in x])])
list(workflow([1, 2, 3]))
此示例将每个输入值乘以2,并通过生成器返回转换后的元素。
由于工作流以生成器的形式运行,必须消费输出才能执行。以下代码段展示了如何消费输出。
# 小型数据集,输出适合在内存中
list(workflow(elements))
# 大型数据集
for output in workflow(elements):
function(output)
# 丢弃输出的大型数据集
for _ in workflow(elements):
pass
工作流可以使用Python或配置运行。下面展示了这两种方法的示例。
示例
以下是一个完整的Python示例。此工作流转录一组音频文件,将文本翻译成法语并索引数据。
from txtai import Embeddings
from txtai.pipeline import Transcription, Translation
from txtai.workflow import FileTask, Task, Workflow
# Embeddings实例
embeddings = Embeddings({
"path": "sentence-transformers/paraphrase-MiniLM-L3-v2",
"content": True
})
# 转录实例
transcribe = Transcription()
# 翻译实例
translate = Translation()
tasks = [
FileTask(transcribe, r"\.wav$"),
Task(lambda x: translate(x, "fr"))
]
# 要处理的文件列表
data = [
"US_tops_5_million.wav",
"Canadas_last_fully.wav",
"Beijing_mobilises.wav",
"The_National_Park.wav",
"Maine_man_wins_1_mil.wav",
"Make_huge_profits.wav"
]
# 将文本翻译成法语的工作流
workflow = Workflow(tasks)
# 索引数据
embeddings.index((uid, text, None) for uid, text in enumerate(workflow(data)))
# 搜索
embeddings.search("wildlife", 1)
配置驱动示例
工作流也可以使用YAML配置定义。
writable: true
embeddings:
path: sentence-transformers/paraphrase-MiniLM-L3-v2
content: true
# 将音频转录为文本
transcription:
# 在语言间翻译文本
translation:
workflow:
index:
tasks:
- action: transcription
select: "\\.wav$"
task: file
- action: translation
args: ["fr"]
- action: index
# 创建并运行工作流
from txtai import Application
# 创建并运行工作流
app = Application("workflow.yml")
list(app.workflow("index", [
"US_tops_5_million.wav",
"Canadas_last_fully.wav",
"Beijing_mobilises.wav",
"The_National_Park.wav",
"Maine_man_wins_1_mil.wav",
"Make_huge_profits.wav"
]))
# 搜索
app.search("wildlife")
上述代码执行了文件workflow.yml
中定义的工作流。
LLM工作流示例
工作流可以将多个LLM提示任务连接在一起。
llm:
path: google/flan-t5-xl
workflow:
llm:
tasks:
- task: template
template: |
为以下文本提取关键词。
{text}
action: llm
- task: template
template: |
将以下文本翻译成法语。
{text}
action: llm
from txtai import Application
app = Application("workflow.yml")
list(app.workflow("llm", [
"""
txtai 是一个开源平台,用于语义搜索
和由语言模型驱动的工作流。
"""
]))
任何txtai管道/工作流任务都可以在包含LLM的工作流中连接。
llm:
path: google/flan-t5-xl
translation:
workflow:
llm:
tasks:
- task: template
template: |
为以下文本提取关键词。
{text}
action: llm
- action: translation
args:
- fr
有关更多信息,请参见以下链接。
方法
工作流是可调用对象。工作流接受可迭代数据元素的输入,并输出可迭代数据元素。
__init__(tasks, batch=100, workers=None, name=None, stream=None)
Creates a new workflow. Workflows are lists of tasks to execute.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks
|
list of workflow tasks |
required | |
batch
|
how many items to process at a time, defaults to 100 |
100
|
|
workers
|
number of concurrent workers |
None
|
|
name
|
workflow name |
None
|
|
stream
|
workflow stream processor |
None
|
Source code in txtai/workflow/base.py
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
|
__call__(elements)
Executes a workflow for input elements. This method returns a generator that yields transformed data elements.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
elements
|
iterable data elements |
required |
Returns:
Type | Description |
---|---|
generator that yields transformed data elements |
Source code in txtai/workflow/base.py
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
|
schedule(cron, elements, iterations=None)
Schedules a workflow using a cron expression and elements.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cron
|
cron expression |
required | |
elements
|
iterable data elements passed to workflow each call |
required | |
iterations
|
number of times to run workflow, defaults to run indefinitely |
None
|
Source code in txtai/workflow/base.py
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
|
更多示例
请参阅此链接以获取完整的工作流示例列表。