Skip to content

Ray Runtime

The Ray runtime provides the ability to run in either a local or Kubernetes cluster, and includes the following set of components:

  • RayTransformLauncher - this is a class generally used to implement main() that makes use of a TransformConfiguration to start the Ray runtime and execute the transform over the specified set of input files. The RayTransformLauncher is created using a RayTransformConfiguration instance.
  • RayTransformConfiguration - this class extends transform's base TransformConfiguration implementation to add an optional TranformRuntime (see next) class to be used by the transform implementation.
  • TransformRuntime - this provides the ability for the transform implementor to create additional Ray resources and include them in the configuration used to create a transform (see, for example,
  • fuzzy dedup Many transforms will not need additional resources and can use the DefaultRayTransformRuntime. TransformRuntime also provide the ability to supplement the statics collected by Statistics (see below).

Roughly speaking the following steps are completed to establish transforms in the RayWorkers

  1. Launcher parses the CLI parameters using an ArgumentParser configured with its own CLI parameters along with those of the Transform Configuration,
  2. Launcher passes the Transform Configuration and CLI parameters to the RayOrchestrator
  3. RayOrchestrator creates the Transform Runtime using the Transform Configuration and its CLI parameter values
  4. Transform Runtime creates transform initialization/configuration including the CLI parameters,
    and any Ray components need by the transform.
  5. RayWorker is started with configuration from the Transform Runtime.
  6. RayWorker creates the Transform using the configuration provided by the Transform Runtime.
  7. Statistics is used to collect the statistics submitted by the individual transform, that is used for building execution metadata.

Processing Architecture

Ray Transform Launcher

The RayTransformLauncher uses the Transform Configuration and provides a single method, launch(), that kicks off the Ray environment and transform execution coordinated by orchestrator. For example,

launcher = RayTransformLauncher(YourTransformConfiguration())
launcher.launch()
Note that the launcher defines some additional CLI parameters that are used to control the operation of the orchestrator and workers and data access. Things such as data access configuration, number of workers, worker resources, etc. Discussion of these options is beyond the scope of this document (see Launcher Options for a list of available options.)

Transform Configuration

In general, a transform should be able to run in both the python and Ray runtimes. As such we first define the python-only transform configuration, which will then be used by the Ray-runtime-specific transform configuration. The python transform configuration implements
TransformConfiguration and defines with transform-specific name, and implementation and class. In addition, it is responsible for providing transform-specific methods to define and capture optional command line arguments.

class YourTransformConfiguration(TransformConfiguration):

    def __init__(self):
        super().__init__(name="YourTransform", transform_class=YourTransform)
        self.params = {}

    def add_input_params(self, parser: ArgumentParser) -> None:
        ...
    def apply_input_params(self, args: Namespace) -> bool:
        ...
Next we define the Ray-runtime specific transform configuration as an extension of the RayTransformConfiguration and uses the YourTransformConfiguration above.
class YourTransformConfiguration(RayTransformConfiguration):
    def __init__(self):
        super().__init__(YourTransformConfiguration(),
                         runtime_class=YourTransformRuntime)
This class provides the ability to create the instance of YourTransformRuntime class (see below) as needed by the Ray runtime. Note, that not all transforms will require a runtime_class and can omit this parameter to default to an acceptable runtime class. Details are covered in the advanced transform tutorial.

Transform Runtime

The DefaultRayTransformRuntime class is provided and will be sufficient for many use cases, especially 1:1 table transformation. However, some transforms will require use of the Ray environment, for example, to create additional workers, establish a shared memory object, etc. Of course, these transforms will generally not run outside of a Ray environment.

class DefaultRayTransformRuntime:

    def __init__(self, params: dict[str, Any]):
        ...

    def get_transform_config(
        self, data_access_factory: DataAccessFactory, statistics: ActorHandle, files: list[str]
    ) -> dict[str, Any]:
        ...

    def compute_execution_stats(self, stats: dict[str, Any]) -> dict[str, Any]:
        ...

The RayOrchestrator initializes the instance with the CLI parameters provided by the Transform Configurations get_input_params() method.

The get_transform_config() method is used by the RayOrchestrator to create the parameters used to initialize the Transform in the RayWorker. This is where additional Ray components would be added to the environment and references added to them, as needed, in the returned dictionary of configuration data that will initialize the transform. For those transforms that don't need this support, the default implementation simpy returns the CLI parameters used to initialize the runtime instance.

The computed_execution_stats() provides an opportunity to augment the statistics collected and aggregated by the TransformStatistics actor. It is called by the RayOrchestrator after all files have been processed.