Skip to content

任务

任务 任务

工作流执行任务。任务是可调用对象,具有多个参数以控制在给定步骤中对数据的处理。虽然与管道类似,但任务封装了处理过程,并不自行执行显著的转换。任务执行逻辑以准备内容,供底层操作使用。

以下是一个简单的任务示例。

Task(lambda x: [y * 2 for y in x])

上述任务对所有输入元素执行上述函数。

任务与管道配合良好,因为管道是可调用对象。下面的示例将对每个输入元素进行总结。

summary = Summary()
Task(summary)

任务可以独立运行,但与工作流配合最佳,因为工作流增加了大规模流处理。

summary = Summary()
task = Task(summary)
task(["非常长的文本在这里"])

workflow = Workflow([task])
list(workflow(["非常长的文本在这里"]))

任务也可以作为工作流的一部分创建,并带有配置。

workflow:
  tasks:
    - action: summary 

__init__(action=None, select=None, unpack=True, column=None, merge='hstack', initialize=None, finalize=None, concurrency=None, onetomany=True, **kwargs)

Creates a new task. A task defines two methods, type of data it accepts and the action to execute for each data element. Action is a callable function or list of callable functions.

Parameters:

Name Type Description Default
action

action(s) to execute on each data element

None
select

filter(s) used to select data to process

None
unpack

if data elements should be unpacked or unwrapped from (id, data, tag) tuples

True
column

column index to select if element is a tuple, defaults to all

None
merge

merge mode for joining multi-action outputs, defaults to hstack

'hstack'
initialize

action to execute before processing

None
finalize

action to execute after processing

None
concurrency

sets concurrency method when execute instance available valid values: "thread" for thread-based concurrency, "process" for process-based concurrency

None
onetomany

if one-to-many data transformations should be enabled, defaults to True

True
kwargs

additional keyword arguments

{}
Source code in txtai/workflow/task/base.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(
    self,
    action=None,
    select=None,
    unpack=True,
    column=None,
    merge="hstack",
    initialize=None,
    finalize=None,
    concurrency=None,
    onetomany=True,
    **kwargs,
):
    """
    Creates a new task. A task defines two methods, type of data it accepts and the action to execute
    for each data element. Action is a callable function or list of callable functions.

    Args:
        action: action(s) to execute on each data element
        select: filter(s) used to select data to process
        unpack: if data elements should be unpacked or unwrapped from (id, data, tag) tuples
        column: column index to select if element is a tuple, defaults to all
        merge: merge mode for joining multi-action outputs, defaults to hstack
        initialize: action to execute before processing
        finalize: action to execute after processing
        concurrency: sets concurrency method when execute instance available
                     valid values: "thread" for thread-based concurrency, "process" for process-based concurrency
        onetomany: if one-to-many data transformations should be enabled, defaults to True
        kwargs: additional keyword arguments
    """

    # Standardize into list of actions
    if not action:
        action = []
    elif not isinstance(action, list):
        action = [action]

    self.action = action
    self.select = select
    self.unpack = unpack
    self.column = column
    self.merge = merge
    self.initialize = initialize
    self.finalize = finalize
    self.concurrency = concurrency
    self.onetomany = onetomany

    # Check for custom registration. Adds additional instance members and validates required dependencies available.
    if hasattr(self, "register"):
        self.register(**kwargs)
    elif kwargs:
        # Raise error if additional keyword arguments passed in without register method
        kwargs = ", ".join(f"'{kw}'" for kw in kwargs)
        raise TypeError(f"__init__() got unexpected keyword arguments: {kwargs}")

多操作任务并发

默认处理模式是按顺序运行操作。多处理支持已经在多个层级内置。例如,任何GPU模型都将最大化GPU利用率,甚至在CPU模式下,也利用了并发性。但仍有一些任务操作并发的使用案例。例如,如果系统有多个GPU,任务运行外部顺序代码,或者任务有大量的I/O任务。

除了顺序处理外,多操作任务可以多线程或多进程运行。每种方法的优点如下所述。

  • 多线程 - 无需创建单独的进程或序列化数据的开销。但由于Python只能由于GIL执行单个线程,因此这种方法对CPU绑定操作没有帮助。此方法适用于I/O绑定操作和GPU操作。

  • 多进程 - 创建单独的子进程,并通过序列化交换数据。此方法可以充分利用所有CPU核心,因为每个进程独立运行。此方法适用于CPU绑定操作。

有关多处理的更多信息,请参阅Python文档

多操作任务合并

多操作任务将为输入数据生成并行输出。任务输出可以通过几种不同的方式合并在一起。

hstack(outputs)

Merges outputs column-wise. Returns a list of tuples which will be interpreted as a one to one transformation.

Column-wise merge example (2 actions)

Inputs: [a, b, c]

Outputs => [[a1, b1, c1], [a2, b2, c2]]

Column Merge => [(a1, a2), (b1, b2), (c1, c2)]

Parameters:

Name Type Description Default
outputs

task outputs

required

Returns:

Type Description

list of aggregated/zipped outputs as tuples (column-wise)

Source code in txtai/workflow/task/base.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def hstack(self, outputs):
    """
    Merges outputs column-wise. Returns a list of tuples which will be interpreted as a one to one transformation.

    Column-wise merge example (2 actions)

      Inputs: [a, b, c]

      Outputs => [[a1, b1, c1], [a2, b2, c2]]

      Column Merge => [(a1, a2), (b1, b2), (c1, c2)]

    Args:
        outputs: task outputs

    Returns:
        list of aggregated/zipped outputs as tuples (column-wise)
    """

    # If all outputs are numpy arrays, use native method
    if all(isinstance(output, np.ndarray) for output in outputs):
        return np.stack(outputs, axis=1)

    # If all outputs are torch tensors, use native method
    # pylint: disable=E1101
    if all(torch.is_tensor(output) for output in outputs):
        return torch.stack(outputs, axis=1)

    return list(zip(*outputs))

vstack(outputs)

Merges outputs row-wise. Returns a list of lists which will be interpreted as a one to many transformation.

Row-wise merge example (2 actions)

Inputs: [a, b, c]

Outputs => [[a1, b1, c1], [a2, b2, c2]]

Row Merge => [[a1, a2], [b1, b2], [c1, c2]] = [a1, a2, b1, b2, c1, c2]

Parameters:

Name Type Description Default
outputs

task outputs

required

Returns:

Type Description

list of aggregated/zipped outputs as one to many transforms (row-wise)

Source code in txtai/workflow/task/base.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def vstack(self, outputs):
    """
    Merges outputs row-wise. Returns a list of lists which will be interpreted as a one to many transformation.

    Row-wise merge example (2 actions)

      Inputs: [a, b, c]

      Outputs => [[a1, b1, c1], [a2, b2, c2]]

      Row Merge => [[a1, a2], [b1, b2], [c1, c2]] = [a1, a2, b1, b2, c1, c2]

    Args:
        outputs: task outputs

    Returns:
        list of aggregated/zipped outputs as one to many transforms (row-wise)
    """

    # If all outputs are numpy arrays, use native method
    if all(isinstance(output, np.ndarray) for output in outputs):
        return np.concatenate(np.stack(outputs, axis=1))

    # If all outputs are torch tensors, use native method
    # pylint: disable=E1101
    if all(torch.is_tensor(output) for output in outputs):
        return torch.cat(tuple(torch.stack(outputs, axis=1)))

    # Flatten into lists of outputs per input row. Wrap as one to many transformation.
    merge = []
    for x in zip(*outputs):
        combine = []
        for y in x:
            if isinstance(y, list):
                combine.extend(y)
            else:
                combine.append(y)

        merge.append(OneToMany(combine))

    return merge

concat(outputs)

Merges outputs column-wise and concats values together into a string. Returns a list of strings.

Concat merge example (2 actions)

Inputs: [a, b, c]

Outputs => [[a1, b1, c1], [a2, b2, c2]]

Concat Merge => [(a1, a2), (b1, b2), (c1, c2)] => ["a1. a2", "b1. b2", "c1. c2"]

Parameters:

Name Type Description Default
outputs

task outputs

required

Returns:

Type Description

list of concat outputs

Source code in txtai/workflow/task/base.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
def concat(self, outputs):
    """
    Merges outputs column-wise and concats values together into a string. Returns a list of strings.

    Concat merge example (2 actions)

      Inputs: [a, b, c]

      Outputs => [[a1, b1, c1], [a2, b2, c2]]

      Concat Merge => [(a1, a2), (b1, b2), (c1, c2)] => ["a1. a2", "b1. b2", "c1. c2"]

    Args:
        outputs: task outputs

    Returns:
        list of concat outputs
    """

    return [". ".join([str(y) for y in x if y]) for x in self.hstack(outputs)]

提取任务输出列

通过列合并,每个输出行将是每个任务操作的输出值的元组。这可以作为下游任务的输入,并且该任务可以让不同的任务处理每个元素。

一个简单的示例:

workflow = Workflow([Task(lambda x: [y * 3 for y in x], unpack=False, column=0)])
list(workflow([(2, 8)]))

对于输入元组(2, 2),工作流将仅选择第一个元素(2),并对该元素运行任务。

workflow = Workflow([Task([lambda x: [y * 3 for y in x], 
                           lambda x: [y - 1 for y in x]],
                           unpack=False, column={0:0, 1:1})])
list(workflow([(2, 8)]))

上述示例对每个输入列应用单独的操作。这种简单的结构可以帮助构建极其强大的工作流图!