RayWorkflow
The Ray Workflow is a backend for executing the functions within a workflow graph on a Ray cluster, which is a framework for scaling and parallelizing python applications (see the Ray documentation).
After declaring a frontend-defined workflow, e.g.:
from functionfuse import workflow
@workflow
def sum(a, b):
return a + b
a, b = 1, 1
a_plus_b = sum(a, b).set_name("node1")
a_plus_two_b = sum(a_plus_b, b).set_name("node2")
a RayWorkflow
can be instantiated, and calling run() will initialize a
local Ray cluster with default settings:
from functionfuse.backends.builtin.rayback import RayWorkflow
ray_workflow = RayWorkflow(a_plus_two_b, workflow_name="sums")
c = ray_workflow.run()
Ray cluster options
Usually the RayWorkflow
will target an existing Ray cluster, and/or define a
set of initialization options to use with ray.init()
. These are passed to the
RayWorkflow
using ray_init_args
:
from functionfuse.backends.builtin.rayback import RayWorkflow
ray_options = {
"address": "ray://localhost:10001",
"ignore_reinit_error": True,
"runtime_env": {"working_dir": "."}
}
ray_workflow = RayWorkflow(a_plus_two_b, workflow_name="sums", ray_init_args=ray_options)
c = ray_workflow.run()
See the ray.init API for all available options.
Ray remote task options
The functions assigned to nodes in the workflow graph are executed as
Ray remote tasks,
which can be assigned individual task-specific options and resources within Ray.
Specific options are assigned to the .backend_info
field of each node using
:ref:background/introduction:Queries through the Query.set_remote_args()
function:
node1_remote_args = {"num_cpus": 1}
node2_remote_args = {"resources": {"custom_resource": 0.1}}
ray_workflow.query("^.*node1.*$").set_remote_args(node1_remote_args)
ray_workflow.query("^.*node2.*$").set_remote_args(node2_remote_args)
c = ray_workflow.run()
See the ray.remote API for all available options.
Plugins
Ray tasks are usually run in a distributed environment across multiple machines
in a Ray cluster. The state of the process on a remote machine may need
some configuration prior to execution of a node. Common scenarios for this
include global setting of package features, or synchronizing states like random
number generators. :ref:backends/backends:Plugins (and collections of plugins)
can be assigned to nodes through the Query.set_plugins()
function:
from functionfuse.backends.plugins import PluginCollection, InitializerPlugin, RandomStatePlugin
def init_float():
import torch
torch.set_default_dtype(torch.float64)
def set_seed(seed):
import torch
torch.random.manual_seed(seed)
print(torch.rand(1))
random_state_plugin = RandomStatePlugin(min= -0x8000_0000_0000_0000, max= 0xffff_ffff_ffff_ffff, seed = -0x8000_0000_0000_0000, seed_func= set_seed)
initializer_plugin = InitializerPlugin(init_float)
plugin_collection = PluginCollection([initializer_plugin, random_state_plugin])
ray_workflow.query("^.*node1.*$").set_plugins(initializer_plugin)
ray_workflow.query("^.*node2.*$").set_plugins(plugin_collection)
c = ray_workflow.run()
Storage
The main storage class for a RayWorkflow
is in rayfilestorage, and is
attached using set_storage(). The typical way to use this Ray version of
FileStorage is to specify a custom resource type in the ray remote arguments
to ensures that save and load functions are processed on a machine with
appropriate attached storage within a distributed cluster.
from functionfuse.storage import storage_factory
ray_storage_remote_args = {
"resources": {"_disk": 0.001}
}
store_config = {
"kind": "ray",
"options": {
"remoteArgs": ray_storage_remote_args,
"path": "storage"
}
}
storage = storage_factory(opt)
ray_workflow.set_storage(storage)
RayWorkflow.run()
uses the get_writer_funcs()
to assign Storage class
functions to a save_func
used to queue up Ray save commands, and uses
the read_task()
property of the read_object
returned by
get_writer_funcs()
of Storage, so any attached
Storage class needs to implement those functions.