Backends

Built-in

Local Workflow

The Local Workflow are implemented with a class and a function to create local storage (see below). During graph execution all node results are saved in the storage. In the second run, all saved nodes are not executed but their results are loaded from the storage.

class functionfuse.backends.builtin.localback.LocalWorkflow(*nodes, workflow_name)[source]

A Backend to run workflows locally. To store node results, use local storage. The storage for this class could be created by functionfuse.storage.storage_factory.

Parameters:
  • nodes – A list of DAG nodes. The backend finds all DAG roots that are ancestors of the nodes and executes graph starting from that roots traversing all descendend nodes.

  • workflow_name – A name of the workflow that is used by storage classes.

run()[source]

Start execution of the workflow

Returns:

A list of results for input nodes or a single result if a single node is used in initialization of the class object.

set_storage(object_storage)[source]

Set storage for the workflow.

Parameters:

object_storage – Storage object.

functionfuse.storage.storage_factory(opt)[source]

Factory that creates a storage class.

Parameters:

opt – Dictionary with parameters of the storage class

Options for local storage is

opt = {
        "kind": "file",
        "options": {
            "path": "path to folder for storage"
        }
}

The name of the storage folder is path/workflow_name. This options returns object of the next class that is passed to functionfuse.backends.builtin.localback.LocalWorkflow.set_storage():

class functionfuse.storage.filestorage.FileStorage(path)[source]

Local file storage. Pickle is used to save results of the graph nodes.

list_tasks(workflow_name, pattern)[source]

List saved results of all saved nodes using glob pattern for workflow.

Parameters:
  • workflow_name (str) – A name of the workflow to list saved results.

  • pattern – A glob pattern to filter out names of saved results.

read_task(workflow_name, task_name)[source]

Read saved result of node execution from workflow storage.

Parameters:
  • workflow_name (str) – A name of the workflow to read saved results.

  • task_name – A task name to load results for.

remove_task(workflow_name, task_name=None, pattern=None)[source]

Remove results of selected nodes from the storage.

Parameters:
  • workflow_name (str) – A name of the workflow to remove node results.

  • task_name – A name of the node to remove. The parameter is ignored if pattern is defined

  • pattern – A glob pattern of node names to be removed.

remove_workflow(workflow_name)[source]

Remove the workflow from the storage.

Parameters:

workflow_name (str) – A name of the workflow to remove from the storage.

Example:

local_workflow = LocalWorkflow(dataset, workflow_name="classifier")
opt = {
    "kind": "file",
    "options": {
        "path": "storage"
    }
}
storage = storage_factory(opt)
local_workflow.set_storage(storage)
_ = local_workflow.run()

Ray Workflow

Ray Workflow interface is similar to Local Workflow with an addition of multiple methods to set Ray resources.

class functionfuse.backends.builtin.rayback.RayWorkflow(*nodes, workflow_name, ray_init_args={})[source]

A Backend to run workflows on Ray engine. The storage for this class could be created by functionfuse.storage.storage_factory.

Parameters:
  • nodes – A list of DAG nodes. The backend finds all DAG roots that are ancestors of the nodes and executes graph starting from that roots traversing all descendend nodes.

  • workflow_name – A name of the workflow that is used by storage classes.

  • ray_init_args (dict) – A dictionary with parameters for Ray init

query(pattern=None)[source]

Query nodes of the graph by regexp pattern.

Parameters:

pattern (Optional[str]) – regexp pattern to match node names. If None returns all nodes.

Returns:

Query object

run(return_results=False, max_pending_tasks=0)[source]

Start execution of the workflow. :param return_results: A flag if results for input nodes are returned :return: A list of results for input nodes or a single result if a single node is used in initialization of the class object.

set_storage(object_storage)[source]

Set storage for the workflow.

Parameters:

object_storage – Storage object.

class functionfuse.backends.builtin.rayback.Query(nodes, workflow)[source]

The class allows to set attributes to sets of nodes. Contains list of nodes returned by query.

set_remote_args(args)[source]

Set arguments for ‘remote’ function in Ray calls. Used to assign resources to remote functions calls.

Parameters:

args (dict) – Dictionary with arguments of a remote call

class functionfuse.storage.rayfilestorage.FileStorage(opt)[source]

Remote file storage on a single designated Ray cluster node. Create a single node with unique Ray resource (e.g., _disk: 1) and route all function calls to this node with remoteArgs (see below) setting the same resource (e.g., _disk: 0.001). Pickle is used to save results of the graph nodes.

To create file storage object use functionfuse.storage.storage_factory() with

opt = {
    "kind": "ray",
    "options": {
        "rayInitArgs": {...},
        "remoteArgs": {...},
        "path": path,
    }
}
Parameters:
  • rayInitArgs (dict, optional) – Use this field only in the read mode. File storage calls Ray init with rayInitArgs parameters. Otherwise, Ray init is called in the workflow class.

  • remoteArgs (dict) – Dictionary with parameters for the remote function (see ray.remote)

  • path – relative or absolute path to save folder. All results are saved in path/workflow_name folder.

list_tasks(workflow_name, pattern)[source]

List saved results of all saved nodes using glob pattern for workflow.

Parameters:
  • workflow_name (str) – A name of the workflow to list saved results.

  • pattern – A glob pattern to filter out names of saved results.

read_task(workflow_name, task_name)[source]

Read saved result of node execution from workflow storage.

Parameters:
  • workflow_name (str) – A name of the workflow to read saved results.

  • task_name – A task name to load results for.

remove_task(workflow_name, task_name=None, pattern=None)[source]

Remove results of selected nodes from the storage.

Parameters:
  • workflow_name (str) – A name of the workflow to remove node results.

  • task_name – A name of the node to remove. The parameter is ignored if pattern is defined

  • pattern – A glob pattern of node names to be removed.

remove_workflow(workflow_name)[source]

Remove the workflow from the storage.

Parameters:

workflow_name (str) – A name of the workflow to remove from the storage.

Example:

ray_init_args = {
    "resources": {"_disk": 1.0, "_model": 1}
}

ray_storage_remote_args = {
    "resources": {"_disk": 0.001}
}

ray_workflow = RayWorkflow(dataset, workflow_name="classifier", ray_init_args=ray_init_args)

# Ray init is called in the RayWorkflow constructor!!! Storage should be created AFTER RayWorkflow is created.
storage_path = os.path.join(os.getcwd(), "storage")
opt = {
    "kind": "ray",
    "options": {
        "remoteArgs": ray_storage_remote_args,
        "path": storage_path,
    }
}

storage = storage_factory(opt)
ray_workflow.set_storage(storage)
ray_workflow.query(pattern="^model$").set_remote_args({"num_cpus": 1, "resources": {"_model": 1}})

_ = ray_workflow.run()

Add-ons

Prefect Workflow

The Prefect Workflow interface is similar to the Ray Workflow, providing options to query nodes to set Prefect Task specific options for each node, as well as ability to pass Prefect Flow options when creating the Prefect Flow from the workflow graph. The Prefect Flow can also be independently generated outside of the run function, stored as PrefectWorkflow.flow, which could be used with Prefect Deployment options (not yet implemented).

class functionfuse.backends.addons.prefectback.PrefectWorkflow(*nodes, workflow_name)[source]

A Backend to run workflows through Prefect. To store node results, use local storage. The storage for this class could be created by functionfuse.storage.storage_factory.

Parameters:
  • nodes – A list of DAG nodes. The backend finds all DAG roots that are ancestors of the nodes and executes graph starting from that roots traversing all descendend nodes.

  • workflow_name – A name of the workflow that is used by storage classes.

generate_flow()[source]

Create the flow from the nodes in the graph. Separating this from run allows deploy() option within Prefect server

query(pattern=None)[source]

Query nodes of the graph by regexp pattern.

Parameters:

pattern (Optional[str]) – regexp pattern to match node names. If None returns all nodes.

Returns:

Query object

run()[source]

Start execution of the workflow

Returns:

A list of results for input nodes or a single result if a single node is used in initialization of the class object.

set_storage(object_storage)[source]

Set storage for the workflow.

Parameters:

object_storage – Storage object.

class functionfuse.backends.addons.prefectback.Query(nodes, workflow)[source]

The class allows to set attributes to sets of nodes. Contains list of nodes returned by query.

set_task_args(args)[source]

Set arguments for task definition in Prefect.

Parameters:

args (dict) – Dictionary with arguments of a prefect.task call

KFP Workflow

The KFP Workflow interface is currently in an alpha state. The main extensions over the LocalWorkflow interface are the requirement to pass information around where to push a container image that can be pulled by the Kubernetes cluster.

class functionfuse.backends.addons.kfpback.KFPWorkflow(*nodes, workflow_name, baseimage='python', registry_credentials={}, kfp_host='http://localhost:3000')[source]

A Backend to run workflows on Kubeflow Pipelines.

Parameters:
  • nodes – A list of DAG nodes. The backend finds all DAG roots that are ancestors of the nodes and executes graph starting from that roots traversing all descendend nodes.

  • workflow_name – A name of the workflow that is used by storage classes.

  • baseimage – The container image to use as the base that the KFP component execution image will be built on top of.

  • registry_credentials – A dictionary of credential information for accessing a private container registry, if necessary.

Available fields:

server: address to the registry server

username: as would be used for docker login

password: as would be used for docker login :param kfp_host: address to the Kubeflow Pipelines host API, to connect the KFP Client

run()[source]

Start execution of the workflow

set_baseimage(baseimage)[source]

Set container image used as the base for the execution image. The execution image will be built by placing the current directory on to the base image. The base image should therefore contain a python environment with any necessary prerequisites for the current code that aren’t passed to KFP using ‘packages_to_install’ or other methods.

Parameters:

baseimage – Container image that can be pulled with docker pull.

set_kfp_host(kfp_host)[source]

Set address to KFP Host.

Parameters:

kfp_host – Address of KFP Host.

set_registry_credentials(registry_credentials)[source]

Set credentials to access a private container registry.

Parameters:

registry_credentials – A dictionary of credential information for accessing a private container registry, if necessary.

Available fields:

server: address to the registry server

username: as would be used for docker login

password: as would be used for docker login