Prefect Workflow
#################
The :ref:`api/backends:Prefect Workflow` is a backend for configuring a
*FunctionFuse* workflow graph as a *Prefect* Flow, with each *FunctionFuse*
Node's main function run as a *Prefect* Task (see the
`Prefect documentation `_).
After declaring a frontend-defined workflow, e.g.:
.. highlight:: python
.. code-block:: python
from functionfuse import workflow
@workflow
def sum(a, b):
return a + b
a, b = 1, 1
a_plus_b = sum(a, b).set_name("node1")
a_plus_two_b = sum(a_plus_b, b).set_name("node2")
a ``PrefectWorkflow`` can be instantiated, and the Prefect Flow can be created
using the ``generate_flow()`` function. Then ``run()`` will start a Flow Run of
the created Flow.
.. highlight:: python
.. code-block:: python
from functionfuse.backends.addons.prefectback import PrefectWorkflow
prefect_workflow = PrefectWorkflow(a_plus_two_b, workflow_name="sums")
prefect_workflow.generate_flow()
c = prefect_workflow.run()
.. note::
``PrefectWorkflow.run()`` will call ``generate_flow()`` if the Flow object
has not been created yet
.. note::
While Prefect Flows can be run multiple times, with a new Flow Run created
on each run of the Flow, *FunctionFuse* Workflows do not currently support
multiple runs. The correct approach in *FunctionFuse* is to create a new
Workflow each time the graph should be run. Of course, stored results will
be loaded from storage if present on subsequent runs.
Prefect Flow options
---------------------
The ``PrefectWorkflow`` can pass options when initializing the Prefect Flow. A
common example would be the Task Runner. These options are passed to the
``@flow`` decorator. Flow options are set in the ``PrefectWorkflow`` using
``set_prefect_flow_options()``:
.. highlight:: python
.. code-block:: python
from prefect.task_runners import ConcurrentTaskRunner
prefect_flow_options = {'task_runner': ConcurrentTaskRunner()}
prefect_workflow.set_prefect_flow_options(prefect_flow_options)
prefect_workflow.generate_flow()
c = prefect_workflow.run()
See the `prefect.flows API `_
for all available options.
.. note::
Supporting Prefect options for changing the Task Runner requires that all
tasks are executed with ``task.sumbmit()`` in the backend (see
`Task runner docs `_).
Function execution within the backend then return ``PrefectFuture`` objects,
and the backend execution is more similar to ``RayWorkflow`` than
``LocalWorkflow``.
Prefect Task options
------------------------
The functions assigned to nodes in the workflow graph are executed as
`Prefect Tasks `_,
which can be assigned individual task-specific options. Options are
assigned to the ``.backend_info`` field of each node using
`:ref:background/introduction:Queries` through the ``Query.set_task_args()``
function. For example, using Prefect caching features means that nodes that
use identical functions and inputs, but are assigned different node names, can
have results retrieved from the cache instead of completing unnecesary
computation:
.. highlight:: python
.. code-block:: python
from prefect.tasks import task_input_hash
query_task_args = {
"^.*sum.*$": {"cache_key_fn": task_input_hash},
"^.*minus.*$": {"cache_key_fn": task_input_hash},
}
for query, task_args in query_task_args.items():
prefect_workflow.query(query).set_task_args(task_args)
prefect_workflow.generate_flow()
c = prefect_workflow.run()
See the `prefect.tasks API `_
for all available options.
Storage
--------
Any :ref:`storage/storage:Storage` class can be added to a ``PrefectWorkflow``
using ``set_storage()``:
.. highlight:: python
.. code-block:: python
from functionfuse.storage import storage_factory
opt = {
"kind": "file",
"options": {
"path": "storage"
}
}
storage = storage_factory(opt)
prefect_workflow.set_storage(storage)
``PrefectWorkflow.run()`` uses the ``save()``, ``read_task()``, and
``always_read`` properties of :ref:`storage/storage:Storage`, so any
Storage class implementing those functions can be attached.
Prefect Server
---------------
The `Prefect Server `_ can be started
to access the logs created by Prefect Flows and Tasks, and provide a UI and
access to other Prefect options. To start the simple local server use:
.. code-block:: sh
$ prefect server start
Example of run logged in Prefect Server:
.. image:: /images/PrefectBasicRun.png
:alt: Prefect Flow Run