API 比较#

Ray Core API 与 Workflows 之间的比较#

Ray Workflows 构建在 Ray 之上,并提供了一个与其 API 大部分一致的子集,同时提供了持久性。本节重点介绍了一些差异:

func.remote 对比 func.bind#

使用 Ray 任务,func.remote 将提交一个远程任务立即运行;func.bind 将生成一个 DAG 中的节点,它将在 DAG 执行时才会执行。

在 Ray Workflow 的上下文中,DAG 的执行会被推迟,直到在 DAG 上调用 workflow.run(dag, workflow_id=...)workflow.run_async(dag, workflow_id=...)。指定工作流 ID 允许在集群故障时通过其 ID 恢复工作流。

其他工作流引擎#

注意:这些比较是受到 Serverless 工作流比较仓库 的启发。

Argo API 比较#

这些比较的原始来源可以 在这里找到

条件语句#

Argo 版本:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#conditionals
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: coinflip-
spec:
  entrypoint: coinflip
  templates:
  - name: coinflip
    steps:
    # flip a coin
    - - name: flip-coin
        template: flip-coin
    # evaluate the result in parallel
    - - name: heads
        template: heads                       # call heads template if "heads"
        when: "{{steps.flip-coin.outputs.result}} == heads"
      - name: tails
        template: tails                       # call tails template if "tails"
        when: "{{steps.flip-coin.outputs.result}} == tails"
    - - name: flip-again
        template: flip-coin
    - - name: complex-condition
        template: heads-tails-or-twice-tails
        # call heads template if first flip was "heads" and second was "tails" OR both were "tails"
        when: >-
            ( {{steps.flip-coin.outputs.result}} == heads &&
              {{steps.flip-again.outputs.result}} == tails
            ) ||
            ( {{steps.flip-coin.outputs.result}} == tails &&
              {{steps.flip-again.outputs.result}} == tails )
      - name: heads-regex
        template: heads                       # call heads template if ~ "hea"
        when: "{{steps.flip-again.outputs.result}} =~ hea"
      - name: tails-regex
        template: tails                       # call heads template if ~ "tai"
        when: "{{steps.flip-again.outputs.result}} =~ tai"

  # Return heads or tails based on a random number
  - name: flip-coin
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        result = "heads" if random.randint(0,1) == 0 else "tails"
        print(result)

  - name: heads
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads\""]

  - name: tails
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was tails\""]
  
  - name: heads-tails-or-twice-tails
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads the first flip and tails the second. Or it was two times tails.\""]
工作流版本:#
import ray
from ray import workflow


@ray.remote
def handle_heads() -> str:
    return "It was heads"


@ray.remote
def handle_tails() -> str:
    return "It was tails"


@ray.remote
def flip_coin() -> str:
    import random

    @ray.remote
    def decide(heads: bool) -> str:
        return workflow.continuation(
            handle_heads.bind() if heads else handle_tails.bind()
        )

    return workflow.continuation(decide.bind(random.random() > 0.5))


if __name__ == "__main__":
    print(workflow.run(flip_coin.bind()))

DAG#

Argo 版本:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#dag
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: dag-diamond-
spec:
  entrypoint: diamond
  templates:
  - name: echo
    inputs:
      parameters:
      - name: message
    container:
      image: alpine:3.7
      command: [echo, "{{inputs.parameters.message}}"]
  - name: diamond
    dag:
      tasks:
      - name: A
        template: echo
        arguments:
          parameters: [{name: message, value: A}]
      - name: B
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: B}]
      - name: C
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: C}]
      - name: D
        dependencies: [B, C]
        template: echo
        arguments:
          parameters: [{name: message, value: D}]
工作流版本:#
import ray
from ray import workflow


@ray.remote
def echo(msg: str, *deps) -> None:
    print(msg)


if __name__ == "__main__":
    A = echo.options(**workflow.options(task_id="A")).bind("A")
    B = echo.options(**workflow.options(task_id="B")).bind("B", A)
    C = echo.options(**workflow.options(task_id="C")).bind("C", A)
    D = echo.options(**workflow.options(task_id="D")).bind("D", A, B)
    workflow.run(D)

多步骤工作流程#

Argo 版本:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#steps
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: steps-
spec:
  entrypoint: hello-hello-hello

  # This spec contains two templates: hello-hello-hello and whalesay
  templates:
  - name: hello-hello-hello
    # Instead of just running a container
    # This template has a sequence of steps
    steps:
    - - name: hello1            # hello1 is run before the following steps
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello1"
    - - name: hello2a           # double dash => run after previous step
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello2a"
      - name: hello2b           # single dash => run in parallel with previous step
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello2b"

  # This is the same template as from the previous example
  - name: whalesay
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]
工作流版本:#
import ray
from ray import workflow


@ray.remote
def hello(msg: str, *deps) -> None:
    print(msg)


@ray.remote
def wait_all(*args) -> None:
    pass


if __name__ == "__main__":
    h1 = hello.options(**workflow.options(task_id="hello1")).bind("hello1")
    h2a = hello.options(**workflow.options(task_id="hello2a")).bind("hello2a")
    h2b = hello.options(**workflow.options(task_id="hello2b")).bind("hello2b", h2a)
    workflow.run(wait_all.bind(h1, h2b))

退出处理器#

Argo 版本:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#exit-handlers
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: exit-handlers-
spec:
  entrypoint: intentional-fail
  onExit: exit-handler                  # invoke exit-handler template at end of the workflow
  templates:
  # primary workflow template
  - name: intentional-fail
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo intentional failure; exit 1"]

  # Exit handler templates
  # After the completion of the entrypoint template, the status of the
  # workflow is made available in the global variable {{workflow.status}}.
  # {{workflow.status}} will be one of: Succeeded, Failed, Error
  - name: exit-handler
    steps:
    - - name: notify
        template: send-email
      - name: celebrate
        template: celebrate
        when: "{{workflow.status}} == Succeeded"
      - name: cry
        template: cry
        when: "{{workflow.status}} != Succeeded"
  - name: send-email
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo send e-mail: {{workflow.name}} {{workflow.status}} {{workflow.duration}}"]
  - name: celebrate
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo hooray!"]
  - name: cry
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo boohoo!"]
工作流版本:#
from typing import Tuple, Optional

import ray
from ray import workflow


@ray.remote
def intentional_fail() -> str:
    raise RuntimeError("oops")


@ray.remote
def cry(error: Exception) -> None:
    print("Sadly", error)


@ray.remote
def celebrate(result: str) -> None:
    print("Success!", result)


@ray.remote
def send_email(result: str) -> None:
    print("Sending email", result)


@ray.remote
def exit_handler(res: Tuple[Optional[str], Optional[Exception]]) -> None:
    result, error = res
    email = send_email.bind(f"Raw result: {result}, {error}")
    if error:
        handler = cry.bind(error)
    else:
        handler = celebrate.bind(result)
    return workflow.continuation(wait_all.bind(handler, email))


@ray.remote
def wait_all(*deps):
    return "done"


if __name__ == "__main__":
    res = intentional_fail.options(**workflow.options(catch_exceptions=True)).bind()
    print(workflow.run(exit_handler.bind(res)))

循环#

Argo 版本:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#loops
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-
spec:
  entrypoint: loop-example
  templates:
  - name: loop-example
    steps:
    - - name: print-message
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems:              # invoke whalesay once for each item in parallel
        - hello world           # item 1
        - goodbye world         # item 2

  - name: whalesay
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay:latest
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]
工作流版本:#
import ray
from ray import workflow


@ray.remote
def hello(msg: str) -> None:
    print(msg)


@ray.remote
def wait_all(*args) -> None:
    pass


if __name__ == "__main__":
    children = []
    for msg in ["hello world", "goodbye world"]:
        children.append(hello.bind(msg))
    workflow.run(wait_all.bind(*children))

递归#

Argo 版本:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#recursion
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: coinflip-recursive-
spec:
  entrypoint: coinflip
  templates:
  - name: coinflip
    steps:
    # flip a coin
    - - name: flip-coin
        template: flip-coin
    # evaluate the result in parallel
    - - name: heads
        template: heads                 # call heads template if "heads"
        when: "{{steps.flip-coin.outputs.result}} == heads"
      - name: tails                     # keep flipping coins if "tails"
        template: coinflip
        when: "{{steps.flip-coin.outputs.result}} == tails"

  - name: flip-coin
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        result = "heads" if random.randint(0,1) == 0 else "tails"
        print(result)

  - name: heads
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads\""]
工作流版本:#
import ray
from ray import workflow


@ray.remote
def handle_heads() -> str:
    return "It was heads"


@ray.remote
def handle_tails() -> str:
    print("It was tails, retrying")
    return workflow.continuation(flip_coin.bind())


@ray.remote
def flip_coin() -> str:
    import random

    @ray.remote
    def decide(heads: bool) -> str:
        if heads:
            return workflow.continuation(handle_heads.bind())
        else:
            return workflow.continuation(handle_tails.bind())

    return workflow.continuation(decide.bind(random.random() > 0.5))


if __name__ == "__main__":
    print(workflow.run(flip_coin.bind()))

重试#

Argo 版本:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#retrying-failed-or-errored-steps
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: retry-backoff-
spec:
  entrypoint: retry-backoff
  templates:
  - name: retry-backoff
    retryStrategy:
      limit: 10
      retryPolicy: "Always"
      backoff:
        duration: "1"      # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
        factor: 2
        maxDuration: "1m"  # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
      affinity:
        nodeAntiAffinity: {}
    container:
      image: python:alpine3.6
      command: ["python", -c]
      # fail with a 66% probability
      args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
工作流版本:#
from typing import Any, Tuple, Optional

import ray
from ray import workflow


@ray.remote
def flaky_step() -> str:
    import random

    if random.choice([0, 1, 1]) != 0:
        raise ValueError("oops")

    return "ok"


@ray.remote
def custom_retry_strategy(func: Any, num_retries: int, delay_s: int) -> str:
    import time

    @ray.remote
    def handle_result(res: Tuple[Optional[str], Optional[Exception]]) -> str:
        result, error = res
        if result:
            return res
        elif num_retries <= 0:
            raise error
        else:
            print("Retrying exception after delay", error)
            time.sleep(delay_s)
            return workflow.continuation(
                custom_retry_strategy.bind(func, num_retries - 1, delay_s)
            )

    res = func.options(**workflow.options(catch_exceptions=True)).bind()
    return workflow.continuation(handle_result.bind(res))


if __name__ == "__main__":
    # Default retry strategy.
    print(
        workflow.run(flaky_step.options(max_retries=10, retry_exceptions=True).bind())
    )
    # Custom strategy.
    print(workflow.run(custom_retry_strategy.bind(flaky_step, 10, 1)))

Metaflow API 比较#

这些比较的原始来源可以 在这里找到

Foreach#

Metaflow 版本:#
# https://docs.metaflow.org/metaflow/basics#foreach
from metaflow import FlowSpec, step


class ForeachFlow(FlowSpec):
    @step
    def start(self):
        self.titles = ["Stranger Things", "House of Cards", "Narcos"]
        self.next(self.a, foreach="titles")

    @step
    def a(self):
        self.title = "%s processed" % self.input
        self.next(self.join)

    @step
    def join(self, inputs):
        self.results = [input.title for input in inputs]
        self.next(self.end)

    @step
    def end(self):
        print("\n".join(self.results))


if __name__ == "__main__":
    ForeachFlow()
工作流版本:#
from typing import List

import ray
from ray import workflow


@ray.remote
def start():
    titles = ["Stranger Things", "House of Cards", "Narcos"]
    children = [a.bind(t) for t in titles]
    return workflow.continuation(end.bind(children))


@ray.remote
def a(title: str) -> str:
    return f"{title} processed"


@ray.remote
def end(results: "List[ray.ObjectRef[str]]") -> str:
    return "\n".join(ray.get(results))


if __name__ == "__main__":
    workflow.run(start.bind())

Cadence API 比较#

这些比较的原始来源可以 在这里找到

子工作流#

Cadence 版本:#
// https://github.com/uber/cadence-java-samples/blob/master/src/main/java/com/uber/cadence/samples/hello/HelloChild.java
public static class GreetingWorkflowImpl implements GreetingWorkflow {

  @Override
  public String getGreeting(String name) {
    // Workflows are stateful. So a new stub must be created for each new child.
    GreetingChild child = Workflow.newChildWorkflowStub(GreetingChild.class);

    // This is a non blocking call that returns immediately.
    // Use child.composeGreeting("Hello", name) to call synchronously.
    Promise<String> greeting = Async.function(child::composeGreeting, "Hello", name);
    // Do something else here.
    return greeting.get(); // blocks waiting for the child to complete.
  }

  // This example shows how parent workflow return right after starting a child workflow,
  // and let the child run itself.
  private String demoAsyncChildRun(String name) {
    GreetingChild child = Workflow.newChildWorkflowStub(GreetingChild.class);
    // non blocking call that initiated child workflow
    Async.function(child::composeGreeting, "Hello", name);
    // instead of using greeting.get() to block till child complete,
    // sometimes we just want to return parent immediately and keep child running
    Promise<WorkflowExecution> childPromise = Workflow.getWorkflowExecution(child);
    childPromise.get(); // block until child started,
    // otherwise child may not start because parent complete first.
    return "let child run, parent just return";
  }

  public static void main(String[] args) {
    // Start a worker that hosts both parent and child workflow implementations.
    Worker.Factory factory = new Worker.Factory(DOMAIN);
    Worker worker = factory.newWorker(TASK_LIST);
    worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class, GreetingChildImpl.class);
    // Start listening to the workflow task list.
    factory.start();

    // Start a workflow execution. Usually this is done from another program.
    WorkflowClient workflowClient = WorkflowClient.newInstance(DOMAIN);
    // Get a workflow stub using the same task list the worker uses.
    GreetingWorkflow workflow = workflowClient.newWorkflowStub(GreetingWorkflow.class);
    // Execute a workflow waiting for it to complete.
    String greeting = workflow.getGreeting("World");
    System.out.println(greeting);
    System.exit(0);
  }
}
工作流版本:#
import ray
from ray import workflow


@ray.remote
def compose_greeting(greeting: str, name: str) -> str:
    return greeting + ": " + name


@ray.remote
def main_workflow(name: str) -> str:
    return workflow.continuation(compose_greeting.bind("Hello", name))


if __name__ == "__main__":
    print(workflow.run(main_workflow.bind("Alice")))

文件处理#

Cadence 版本:#
// https://github.com/uber/cadence-java-samples/tree/master/src/main/java/com/uber/cadence/samples/fileprocessing
public class FileProcessingWorkflowImpl implements FileProcessingWorkflow {

  // Uses the default task list shared by the pool of workers.
  private final StoreActivities defaultTaskListStore;

  public FileProcessingWorkflowImpl() {
    // Create activity clients.
    ActivityOptions ao =
        new ActivityOptions.Builder()
            .setScheduleToCloseTimeout(Duration.ofSeconds(10))
            .setTaskList(FileProcessingWorker.TASK_LIST)
            .build();
    this.defaultTaskListStore = Workflow.newActivityStub(StoreActivities.class, ao);
  }

  @Override
  public void processFile(URL source, URL destination) {
    RetryOptions retryOptions =
        new RetryOptions.Builder()
            .setExpiration(Duration.ofSeconds(10))
            .setInitialInterval(Duration.ofSeconds(1))
            .build();
    // Retries the whole sequence on any failure, potentially on a different host.
    Workflow.retry(retryOptions, () -> processFileImpl(source, destination));
  }

  private void processFileImpl(URL source, URL destination) {
    StoreActivities.TaskListFileNamePair downloaded = defaultTaskListStore.download(source);

    // Now initialize stubs that are specific to the returned task list.
    ActivityOptions hostActivityOptions =
        new ActivityOptions.Builder()
            .setTaskList(downloaded.getHostTaskList())
            .setScheduleToCloseTimeout(Duration.ofSeconds(10))
            .build();
    StoreActivities hostSpecificStore =
        Workflow.newActivityStub(StoreActivities.class, hostActivityOptions);

    // Call processFile activity to zip the file.
    // Call the activity to process the file using worker-specific task list.
    String processed = hostSpecificStore.process(downloaded.getFileName());
    // Call upload activity to upload the zipped file.
    hostSpecificStore.upload(processed, destination);
  }
}
工作流版本:#
from typing import List

import ray
from ray import workflow

FILES_TO_PROCESS = ["file-{}".format(i) for i in range(100)]


# Mock method to download a file.
def download(url: str) -> str:
    return "contents" * 10000


# Mock method to process a file.
def process(contents: str) -> str:
    return "processed: " + contents


# Mock method to upload a file.
def upload(contents: str) -> None:
    pass


@ray.remote
def upload_all(file_contents: List[ray.ObjectRef]) -> None:
    @ray.remote
    def upload_one(contents: str) -> None:
        upload(contents)

    children = [upload_one.bind(f) for f in file_contents]

    @ray.remote
    def wait_all(*deps) -> None:
        pass

    return wait_all.bind(*children)


@ray.remote
def process_all(file_contents: List[ray.ObjectRef]) -> None:
    @ray.remote
    def process_one(contents: str) -> str:
        return process(contents)

    children = [process_one.bind(f) for f in file_contents]
    return upload_all.bind(children)


@ray.remote
def download_all(urls: List[str]) -> None:
    @ray.remote
    def download_one(url: str) -> str:
        return download(url)

    children = [download_one.bind(u) for u in urls]
    return process_all.bind(children)


if __name__ == "__main__":
    res = download_all.bind(FILES_TO_PROCESS)
    workflow.run(res)

行程预订#

Cadence 版本:#
// https://github.com/uber/cadence-java-samples/tree/master/src/main/java/com/uber/cadence/samples/bookingsaga
public class TripBookingWorkflowImpl implements TripBookingWorkflow {

  private final ActivityOptions options =
      new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofHours(1)).build();
  private final TripBookingActivities activities =
      Workflow.newActivityStub(TripBookingActivities.class, options);

  @Override
  public void bookTrip(String name) {
    Saga.Options sagaOptions = new Saga.Options.Builder().setParallelCompensation(true).build();
    Saga saga = new Saga(sagaOptions);
    try {
      String carReservationID = activities.reserveCar(name);
      saga.addCompensation(activities::cancelCar, carReservationID, name);

      String hotelReservationID = activities.bookHotel(name);
      saga.addCompensation(activities::cancelHotel, hotelReservationID, name);

      String flightReservationID = activities.bookFlight(name);
      saga.addCompensation(activities::cancelFlight, flightReservationID, name);
    } catch (ActivityException e) {
      saga.compensate();
      throw e;
    }
  }
}
工作流版本:#
from typing import List, Tuple, Optional

import ray
from ray import workflow


# Mock method to make requests to an external service.
def make_request(*args) -> None:
    return "-".join(args)


# Generate an idempotency token (this is an extension to the cadence example).
@ray.remote
def generate_request_id():
    import uuid

    return uuid.uuid4().hex


@ray.remote
def book_car(request_id: str) -> str:
    car_reservation_id = make_request("book_car", request_id)
    return car_reservation_id


@ray.remote
def book_hotel(request_id: str, *deps) -> str:
    hotel_reservation_id = make_request("book_hotel", request_id)
    return hotel_reservation_id


@ray.remote
def book_flight(request_id: str, *deps) -> str:
    flight_reservation_id = make_request("book_flight", request_id)
    return flight_reservation_id


@ray.remote
def book_all(car_req_id: str, hotel_req_id: str, flight_req_id: str) -> str:
    car_res_id = book_car.bind(car_req_id)
    hotel_res_id = book_hotel.bind(hotel_req_id, car_res_id)
    flight_res_id = book_flight.bind(hotel_req_id, hotel_res_id)

    @ray.remote
    def concat(*ids: List[str]) -> str:
        return ", ".join(ids)

    return workflow.continuation(concat.bind(car_res_id, hotel_res_id, flight_res_id))


@ray.remote
def handle_errors(
    car_req_id: str,
    hotel_req_id: str,
    flight_req_id: str,
    final_result: Tuple[Optional[str], Optional[Exception]],
) -> str:
    result, error = final_result

    @ray.remote
    def wait_all(*deps) -> None:
        pass

    @ray.remote
    def cancel(request_id: str) -> None:
        make_request("cancel", request_id)

    if error:
        return workflow.continuation(
            wait_all.bind(
                cancel.bind(car_req_id),
                cancel.bind(hotel_req_id),
                cancel.bind(flight_req_id),
            )
        )
    else:
        return result


if __name__ == "__main__":
    car_req_id = generate_request_id.bind()
    hotel_req_id = generate_request_id.bind()
    flight_req_id = generate_request_id.bind()
    # TODO(ekl) we could create a Saga helper function that automates this
    # pattern of compensation workflows.
    saga_result = book_all.options(**workflow.options(catch_exceptions=True)).bind(
        car_req_id, hotel_req_id, flight_req_id
    )
    final_result = handle_errors.bind(
        car_req_id, hotel_req_id, flight_req_id, saga_result
    )
    print(workflow.run(final_result))

Google Cloud Workflows API 比较#

这些比较的原始来源可以 在这里找到

数据条件#

Google Cloud 版本:#
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/step_conditional_jump.workflows.json
[
  {
    "firstStep": {
      "call": "http.get",
      "args": {
        "url": "https://www.example.com/callA"
      },
      "result": "firstResult"
    }
  },
  {
    "whereToJump": {
      "switch": [
        {
          "condition": "${firstResult.body.SomeField < 10}",
          "next": "small"
        },
        {
          "condition": "${firstResult.body.SomeField < 100}",
          "next": "medium"
        }
      ],
      "next": "large"
    }
  },
  {
    "small": {
      "call": "http.get",
      "args": {
        "url": "https://www.example.com/SmallFunc"
      },
      "next": "end"
    }
  },
  {
    "medium": {
      "call": "http.get",
      "args": {
        "url": "https://www.example.com/MediumFunc"
      },
      "next": "end"
    }
  },
  {
    "large": {
      "call": "http.get",
      "args": {
        "url": "https://www.example.com/LargeFunc"
      },
      "next": "end"
    }
  }
]
工作流版本:#
import ray
from ray import workflow


# Mock method to make a request.
def make_request(url: str) -> str:
    return "42"


@ray.remote
def get_size() -> int:
    return int(make_request("https://www.example.com/callA"))


@ray.remote
def small(result: int) -> str:
    return make_request("https://www.example.com/SmallFunc")


@ray.remote
def medium(result: int) -> str:
    return make_request("https://www.example.com/MediumFunc")


@ray.remote
def large(result: int) -> str:
    return make_request("https://www.example.com/LargeFunc")


@ray.remote
def decide(result: int) -> str:
    if result < 10:
        return workflow.continuation(small.bind(result))
    elif result < 100:
        return workflow.continuation(medium.bind(result))
    else:
        return workflow.continuation(large.bind(result))


if __name__ == "__main__":
    print(workflow.run(decide.bind(get_size.bind())))

连接数组#

Google Cloud 版本:#
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/array.workflows.json
[
  {
    "define": {
      "assign": [
        {
          "array": [
            "foo",
            "ba",
            "r"
          ]
        },
        {
          "result": ""
        },
        {
          "i": 0
        }
      ]
    }
  },
  {
    "check_condition": {
      "switch": [
        {
          "condition": "${len(array) > i}",
          "next": "iterate"
        }
      ],
      "next": "exit_loop"
    }
  },
  {
    "iterate": {
      "assign": [
        {
          "result": "${result + array[i]}"
        },
        {
          "i": "${i+1}"
        }
      ],
      "next": "check_condition"
    }
  },
  {
    "exit_loop": {
      "return": {
        "concat_result": "${result}"
      }
    }
  }
]
工作流版本:#
from typing import List

import ray
from ray import workflow


@ray.remote
def iterate(array: List[str], result: str, i: int) -> str:
    if i >= len(array):
        return result
    return workflow.continuation(iterate.bind(array, result + array[i], i + 1))


if __name__ == "__main__":
    print(workflow.run(iterate.bind(["foo", "ba", "r"], "", 0)))

子工作流#

Google Cloud 版本:#
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/subworkflow.workflows.json
{
  "main": {
    "steps": [
      {
        "first": {
          "call": "hello",
          "args": {
            "input": "Kristof"
          },
          "result": "someOutput"
        }
      },
      {
        "second": {
          "return": "${someOutput}"
        }
      }
    ]
  },
  "hello": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "first": {
          "return": "${\"Hello \"+input}"
        }
      }
    ]
  }
}
工作流版本:#
import ray
from ray import workflow


@ray.remote
def hello(name: str) -> str:
    return workflow.continuation(format_name.bind(name))


@ray.remote
def format_name(name: str) -> str:
    return f"hello, {name}"


@ray.remote
def report(msg: str) -> None:
    print(msg)


if __name__ == "__main__":
    r1 = hello.bind("Kristof")
    r2 = report.bind(r1)
    workflow.run(r2)

Prefect API 比较#

这些比较的原始来源可以 在这里找到

循环#

Prefect 版本:#
# https://docs.prefect.io/core/advanced_tutorials/task-looping.html

import requests
from datetime import timedelta

import prefect
from prefect import task
from prefect import Flow, Parameter
from prefect.engine.signals import LOOP


@task(max_retries=5, retry_delay=timedelta(seconds=2))
def compute_large_fibonacci(M):
    # we extract the accumulated task loop result from context
    loop_payload = prefect.context.get("task_loop_result", {})

    n = loop_payload.get("n", 1)
    fib = loop_payload.get("fib", 1)

    next_fib = requests.post(
        "https://nemo.api.stdlib.com/fibonacci@0.0.1/", data={"nth": n}
    ).json()

    if next_fib > M:
        return fib  # return statements end the loop

    raise LOOP(message=f"Fib {n}={next_fib}", result=dict(n=n + 1, fib=next_fib))


if __name__ == "__main__":
    with Flow("fibonacci") as flow:
        M = Parameter("M")
        fib_num = compute_large_fibonacci(M)

    flow_state = flow.run(M=100)
    print(flow_state.result[fib_num].result) # 89
工作流版本:#
import tempfile

import ray
from ray import workflow
from ray.actor import ActorHandle


@ray.remote
class FibonacciActor:
    def __init__(self):
        self.cache = {}

    def compute(self, n):
        if n not in self.cache:
            assert n > 0
            a, b = 0, 1
            for _ in range(n - 1):
                a, b = b, a + b
            self.cache[n] = b
        return self.cache[n]


@ray.remote
def compute_large_fib(fibonacci_actor: ActorHandle, M: int, n: int = 1, fib: int = 1):
    next_fib = ray.get(fibonacci_actor.compute.remote(n))
    if next_fib > M:
        return fib
    else:
        return workflow.continuation(
            compute_large_fib.bind(fibonacci_actor, M, n + 1, next_fib)
        )


if __name__ == "__main__":
    ray.init(storage=f"file://{tempfile.TemporaryDirectory().name}")
    assert workflow.run(compute_large_fib.bind(FibonacciActor.remote(), 100)) == 89

AirFlow API 比较#

这些比较的原始来源可以 在这里找到

ETL 工作流程#

AirFlow 版本:#
# https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/example_dags/tutorial_taskflow_api_etl.html

import json

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
}


@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def tutorial_taskflow_api_etl():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple ETL data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """

    @task()
    def extract():
        """
        #### Extract task
        A simple Extract task to get data ready for the rest of the data
        pipeline. In this case, getting data is simulated by reading from a
        hardcoded JSON string.
        """
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict

    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        """
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}

    @task()
    def load(total_order_value: float):
        """
        #### Load task
        A simple Load task which takes in the result of the Transform task and
        instead of saving it to end user review, just prints it out.
        """

        print(f"Total order value is: {total_order_value:.2f}")

    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])


tutorial_etl_dag = tutorial_taskflow_api_etl()
工作流版本:#
import json

import ray
from ray import workflow


@ray.remote
def extract() -> dict:
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    order_data_dict = json.loads(data_string)
    return order_data_dict


@ray.remote
def transform(order_data_dict: dict) -> dict:
    total_order_value = 0
    for value in order_data_dict.values():
        total_order_value += value
    return {"total_order_value": ray.put(total_order_value)}


@ray.remote
def load(data_dict: dict) -> str:
    total_order_value = ray.get(data_dict["total_order_value"])
    return f"Total order value is: {total_order_value:.2f}"


if __name__ == "__main__":
    order_data = extract.bind()
    order_summary = transform.bind(order_data)
    etl = load.bind(order_summary)
    print(workflow.run(etl))