任务
工作流执行任务。任务是可调用对象,具有多个参数以控制在给定步骤中对数据的处理。虽然与管道类似,但任务封装了处理过程,并不自行执行显著的转换。任务执行逻辑以准备内容,供底层操作使用。
以下是一个简单的任务示例。
Task(lambda x: [y * 2 for y in x])
上述任务对所有输入元素执行上述函数。
任务与管道配合良好,因为管道是可调用对象。下面的示例将对每个输入元素进行总结。
summary = Summary()
Task(summary)
任务可以独立运行,但与工作流配合最佳,因为工作流增加了大规模流处理。
summary = Summary()
task = Task(summary)
task(["非常长的文本在这里"])
workflow = Workflow([task])
list(workflow(["非常长的文本在这里"]))
任务也可以作为工作流的一部分创建,并带有配置。
workflow:
tasks:
- action: summary
__init__(action=None, select=None, unpack=True, column=None, merge='hstack', initialize=None, finalize=None, concurrency=None, onetomany=True, **kwargs)
Creates a new task. A task defines two methods, type of data it accepts and the action to execute for each data element. Action is a callable function or list of callable functions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action
|
action(s) to execute on each data element |
None
|
|
select
|
filter(s) used to select data to process |
None
|
|
unpack
|
if data elements should be unpacked or unwrapped from (id, data, tag) tuples |
True
|
|
column
|
column index to select if element is a tuple, defaults to all |
None
|
|
merge
|
merge mode for joining multi-action outputs, defaults to hstack |
'hstack'
|
|
initialize
|
action to execute before processing |
None
|
|
finalize
|
action to execute after processing |
None
|
|
concurrency
|
sets concurrency method when execute instance available valid values: "thread" for thread-based concurrency, "process" for process-based concurrency |
None
|
|
onetomany
|
if one-to-many data transformations should be enabled, defaults to True |
True
|
|
kwargs
|
additional keyword arguments |
{}
|
Source code in txtai/workflow/task/base.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
|
多操作任务并发
默认处理模式是按顺序运行操作。多处理支持已经在多个层级内置。例如,任何GPU模型都将最大化GPU利用率,甚至在CPU模式下,也利用了并发性。但仍有一些任务操作并发的使用案例。例如,如果系统有多个GPU,任务运行外部顺序代码,或者任务有大量的I/O任务。
除了顺序处理外,多操作任务可以多线程或多进程运行。每种方法的优点如下所述。
-
多线程 - 无需创建单独的进程或序列化数据的开销。但由于Python只能由于GIL执行单个线程,因此这种方法对CPU绑定操作没有帮助。此方法适用于I/O绑定操作和GPU操作。
-
多进程 - 创建单独的子进程,并通过序列化交换数据。此方法可以充分利用所有CPU核心,因为每个进程独立运行。此方法适用于CPU绑定操作。
有关多处理的更多信息,请参阅Python文档。
多操作任务合并
多操作任务将为输入数据生成并行输出。任务输出可以通过几种不同的方式合并在一起。
hstack(outputs)
Merges outputs column-wise. Returns a list of tuples which will be interpreted as a one to one transformation.
Column-wise merge example (2 actions)
Inputs: [a, b, c]
Outputs => [[a1, b1, c1], [a2, b2, c2]]
Column Merge => [(a1, a2), (b1, b2), (c1, c2)]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outputs
|
task outputs |
required |
Returns:
Type | Description |
---|---|
list of aggregated/zipped outputs as tuples (column-wise) |
Source code in txtai/workflow/task/base.py
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 |
|
vstack(outputs)
Merges outputs row-wise. Returns a list of lists which will be interpreted as a one to many transformation.
Row-wise merge example (2 actions)
Inputs: [a, b, c]
Outputs => [[a1, b1, c1], [a2, b2, c2]]
Row Merge => [[a1, a2], [b1, b2], [c1, c2]] = [a1, a2, b1, b2, c1, c2]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outputs
|
task outputs |
required |
Returns:
Type | Description |
---|---|
list of aggregated/zipped outputs as one to many transforms (row-wise) |
Source code in txtai/workflow/task/base.py
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 |
|
concat(outputs)
Merges outputs column-wise and concats values together into a string. Returns a list of strings.
Concat merge example (2 actions)
Inputs: [a, b, c]
Outputs => [[a1, b1, c1], [a2, b2, c2]]
Concat Merge => [(a1, a2), (b1, b2), (c1, c2)] => ["a1. a2", "b1. b2", "c1. c2"]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outputs
|
task outputs |
required |
Returns:
Type | Description |
---|---|
list of concat outputs |
Source code in txtai/workflow/task/base.py
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 |
|
提取任务输出列
通过列合并,每个输出行将是每个任务操作的输出值的元组。这可以作为下游任务的输入,并且该任务可以让不同的任务处理每个元素。
一个简单的示例:
workflow = Workflow([Task(lambda x: [y * 3 for y in x], unpack=False, column=0)])
list(workflow([(2, 8)]))
对于输入元组(2, 2),工作流将仅选择第一个元素(2),并对该元素运行任务。
workflow = Workflow([Task([lambda x: [y * 3 for y in x],
lambda x: [y - 1 for y in x]],
unpack=False, column={0:0, 1:1})])
list(workflow([(2, 8)]))
上述示例对每个输入列应用单独的操作。这种简单的结构可以帮助构建极其强大的工作流图!