Skip to content

服务任务

任务 任务

服务任务从HTTP服务中提取内容。

示例

以下展示了一个使用此任务作为工作流一部分的简单示例。

from txtai.workflow import ServiceTask, Workflow

workflow = Workflow([ServiceTask(url="https://service.url/action)])
workflow(["parameter"])

配置驱动示例

此任务也可以通过工作流配置创建。

workflow:
  tasks:
    - task: service
      url: https://service.url/action

方法

任务的Python文档。

__init__(action=None, select=None, unpack=True, column=None, merge='hstack', initialize=None, finalize=None, concurrency=None, onetomany=True, **kwargs)

Creates a new task. A task defines two methods, type of data it accepts and the action to execute for each data element. Action is a callable function or list of callable functions.

Parameters:

Name Type Description Default
action

action(s) to execute on each data element

None
select

filter(s) used to select data to process

None
unpack

if data elements should be unpacked or unwrapped from (id, data, tag) tuples

True
column

column index to select if element is a tuple, defaults to all

None
merge

merge mode for joining multi-action outputs, defaults to hstack

'hstack'
initialize

action to execute before processing

None
finalize

action to execute after processing

None
concurrency

sets concurrency method when execute instance available valid values: "thread" for thread-based concurrency, "process" for process-based concurrency

None
onetomany

if one-to-many data transformations should be enabled, defaults to True

True
kwargs

additional keyword arguments

{}
Source code in txtai/workflow/task/base.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(
    self,
    action=None,
    select=None,
    unpack=True,
    column=None,
    merge="hstack",
    initialize=None,
    finalize=None,
    concurrency=None,
    onetomany=True,
    **kwargs,
):
    """
    Creates a new task. A task defines two methods, type of data it accepts and the action to execute
    for each data element. Action is a callable function or list of callable functions.

    Args:
        action: action(s) to execute on each data element
        select: filter(s) used to select data to process
        unpack: if data elements should be unpacked or unwrapped from (id, data, tag) tuples
        column: column index to select if element is a tuple, defaults to all
        merge: merge mode for joining multi-action outputs, defaults to hstack
        initialize: action to execute before processing
        finalize: action to execute after processing
        concurrency: sets concurrency method when execute instance available
                     valid values: "thread" for thread-based concurrency, "process" for process-based concurrency
        onetomany: if one-to-many data transformations should be enabled, defaults to True
        kwargs: additional keyword arguments
    """

    # Standardize into list of actions
    if not action:
        action = []
    elif not isinstance(action, list):
        action = [action]

    self.action = action
    self.select = select
    self.unpack = unpack
    self.column = column
    self.merge = merge
    self.initialize = initialize
    self.finalize = finalize
    self.concurrency = concurrency
    self.onetomany = onetomany

    # Check for custom registration. Adds additional instance members and validates required dependencies available.
    if hasattr(self, "register"):
        self.register(**kwargs)
    elif kwargs:
        # Raise error if additional keyword arguments passed in without register method
        kwargs = ", ".join(f"'{kw}'" for kw in kwargs)
        raise TypeError(f"__init__() got unexpected keyword arguments: {kwargs}")

register(url=None, method=None, params=None, batch=True, extract=None)

Adds service parameters to task. Checks if required dependencies are installed.

Parameters:

Name Type Description Default
url

url to connect to

None
method

http method, GET or POST

None
params

default query parameters

None
batch

if True, all elements are passed in a single batch request, otherwise a service call is executed per element

True
extract

list of sections to extract from response

None
Source code in txtai/workflow/task/service.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def register(self, url=None, method=None, params=None, batch=True, extract=None):
    """
    Adds service parameters to task. Checks if required dependencies are installed.

    Args:
        url: url to connect to
        method: http method, GET or POST
        params: default query parameters
        batch: if True, all elements are passed in a single batch request, otherwise a service call is executed per element
        extract: list of sections to extract from response
    """

    if not XML_TO_DICT:
        raise ImportError('ServiceTask is not available - install "workflow" extra to enable')

    # pylint: disable=W0201
    # Save URL, method and parameter defaults
    self.url = url
    self.method = method
    self.params = params

    # If True, all elements are passed in a single batch request, otherwise a service call is executed per element
    self.batch = batch

    # Save sections to extract. Supports both a single string and a hierarchical list of sections.
    self.extract = extract
    if self.extract:
        self.extract = [self.extract] if isinstance(self.extract, str) else self.extract