Prefect Workflow

The Prefect Workflow is a backend for configuring a FunctionFuse workflow graph as a Prefect Flow, with each FunctionFuse Node’s main function run as a Prefect Task (see the Prefect documentation).

After declaring a frontend-defined workflow, e.g.:

from functionfuse import workflow

@workflow
def sum(a, b):
    return a + b

a, b = 1, 1

a_plus_b = sum(a, b).set_name("node1")
a_plus_two_b = sum(a_plus_b, b).set_name("node2")

a PrefectWorkflow can be instantiated, and the Prefect Flow can be created using the generate_flow() function. Then run() will start a Flow Run of the created Flow.

from functionfuse.backends.addons.prefectback import PrefectWorkflow

prefect_workflow = PrefectWorkflow(a_plus_two_b, workflow_name="sums")
prefect_workflow.generate_flow()
c = prefect_workflow.run()

Note

PrefectWorkflow.run() will call generate_flow() if the Flow object has not been created yet

Note

While Prefect Flows can be run multiple times, with a new Flow Run created on each run of the Flow, FunctionFuse Workflows do not currently support multiple runs. The correct approach in FunctionFuse is to create a new Workflow each time the graph should be run. Of course, stored results will be loaded from storage if present on subsequent runs.

Prefect Flow options

The PrefectWorkflow can pass options when initializing the Prefect Flow. A common example would be the Task Runner. These options are passed to the @flow decorator. Flow options are set in the PrefectWorkflow using set_prefect_flow_options():

from prefect.task_runners import ConcurrentTaskRunner
prefect_flow_options = {'task_runner': ConcurrentTaskRunner()}

prefect_workflow.set_prefect_flow_options(prefect_flow_options)
prefect_workflow.generate_flow()
c = prefect_workflow.run()

See the prefect.flows API for all available options.

Note

Supporting Prefect options for changing the Task Runner requires that all tasks are executed with task.sumbmit() in the backend (see Task runner docs). Function execution within the backend then return PrefectFuture objects, and the backend execution is more similar to RayWorkflow than LocalWorkflow.

Prefect Task options

The functions assigned to nodes in the workflow graph are executed as Prefect Tasks, which can be assigned individual task-specific options. Options are assigned to the .backend_info field of each node using :ref:background/introduction:Queries through the Query.set_task_args() function. For example, using Prefect caching features means that nodes that use identical functions and inputs, but are assigned different node names, can have results retrieved from the cache instead of completing unnecesary computation:

from prefect.tasks import task_input_hash
query_task_args = {
    "^.*sum.*$": {"cache_key_fn": task_input_hash},
    "^.*minus.*$": {"cache_key_fn": task_input_hash},
}
for query, task_args in query_task_args.items():
    prefect_workflow.query(query).set_task_args(task_args)

prefect_workflow.generate_flow()
c = prefect_workflow.run()

See the prefect.tasks API for all available options.

Storage

Any Storage class can be added to a PrefectWorkflow using set_storage():

from functionfuse.storage import storage_factory

opt = {
    "kind": "file",
    "options": {
        "path": "storage"
    }
}
storage = storage_factory(opt)
prefect_workflow.set_storage(storage)

PrefectWorkflow.run() uses the save(), read_task(), and always_read properties of Storage, so any Storage class implementing those functions can be attached.

Prefect Server

The Prefect Server can be started to access the logs created by Prefect Flows and Tasks, and provide a UI and access to other Prefect options. To start the simple local server use:

$ prefect server start

Example of run logged in Prefect Server:

Prefect Flow Run