Simplest Transform Tutorial
In this example, we implement a noop transform that takes no action on the input datum and returns it unmodified - a no operation (noop). This effectively enables a copy of a directory tree of files to an output directory. This is functionally not too powerful, but allows us to focus on the minimum requirements for a transform.
NOTE: What follows is a discussion of pyarrow Table transform that
will run in either the Ray or Python runtimes.
Mapping the tutorial to byte arrays would use the
AbstractBinaryTransform
instead of AbstractTableTransform
(a sub-class of the former).
Mapping the tutorial to a Spark runtime would use
AbstractSparkTransform
instead of AbstractTableTransform
and use DataFrame
instead of pyarrow Table as
the DATA
type. In addition, the
SparkTransformLauncher
would be used in place of the RayTransformLauncher
and PythonTransformLauncher
shown below.
That said, we will show the following:
- How to write the noop transform to generate the output table.
- How to define transform-specific metadata that can be associated with each table transformation and aggregated across all transformations in a single run of the transform.
- How to define command line arguments that can be used to configure the operation of our noop transform.
We will not be showing the following:
* The creation of a custom TransformRuntime
that would enable more global
state and/or coordination among the transforms running in other Ray actors.
This will be covered in an advanced tutorial.
The complete task involves the following:
* noop_main.py
- a empty file to start writing code as described below
* NOOPTransform
- class that implements the specific transformation
* NOOPTableTransformConfiguration
- class that provides configuration for the
NOOPTransform
, specifically the command line arguments used to configure it.
* main()
- simple creation and use of the TransformLauncher
.
(Currently, the complete code for the noop transform used for this tutorial can be found in the noop transform directory.
Finally, we show how to use the command line to run the transform in a local ray cluster.
Note: You will need to run the setup commands in the
README
before running the following examples.
NOOPTransform
First, let's define the transform class. To do this we extend
the base abstract/interface class
AbstractTableTransform
,
which requires definition of the following:
- an initializer (i.e.
init()
) that accepts a dictionary of configuration data. For this example, the configuration data will only be defined by command line arguments (defined below). - the
transform()
method itself that takes an input table and produces an output table with any associated metadata for that table transformation.
Other methods such as flush()
need not be overridden/redefined for this simple example.
We start with the simple definition of the class, its initializer and the imports required by subsequent code:
import time
from argparse import ArgumentParser, Namespace
from typing import Any
import pyarrow as pa
from data_processing_ray.runtime.ray import RayTransformLauncher
from data_processing_ray.runtime.ray.runtime_configuration import (
RayTransformRuntimeConfiguration,
)
from data_processing.transform import AbstractTableTransform, TransformConfiguration
from data_processing.utils import CLIArgumentProvider, get_logger
class NOOPTransform(AbstractTableTransform):
def __init__(self, config: dict[str, Any]):
self.sleep = config.get("sleep", 1)
NOOPTransform
class extends the AbstractTableTransform
, which defines the required methods.
For purposes of the tutorial and to simulate a more complex processing
job, our initializer allows our transform to be configurable
with an amount of seconds to sleep/delay during the call to transform()
.
Configuration is provided by the framework in a dictionary provided to the initializer.
Below we will cover how this sleep
argument is made available to the initializer.
Note that in more complex transforms that might, for example, load a Hugging Face or other model, or perform other deep initializations, these can be done in the initializer.
Next we define the transform()
method itself, which includes the addition of some
almost trivial metadata.
def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]:
if self.sleep is not None:
time.sleep(self.sleep)
# Add some sample metadata.
metadata = {"nfiles": 1, "nrows": len(table)}
return [table], metadata
NOOPTransformConfiguration
Next we define the NOOPTransformConfiguration
class and its initializer that defines the following:
- The short name for the transform
- The class implementing the transform - in our case
NOOPTransform
- Command line argument support.
We also define the NOOPRayTransformationConfiguration
so we can run the transform
in the Ray runtime as well. This adds allows the option to add a transform-specific
Ray runtime class allowing more complex distributed memory and data sharing operations.
The NOOP transform will not make use of this so is a simple extension.:
First we define the pure python transform configuration class and its initializer,
short_name = "noop"
cli_prefix = f"{short_name}_"
sleep_key = "sleep_sec"
pwd_key = "pwd"
sleep_cli_param = f"{cli_prefix}{sleep_key}"
pwd_cli_param = f"{cli_prefix}{pwd_key}"
class NOOPTransformConfiguration(TransformConfiguration):
def __init__(self):
super().__init__(
name=short_name,
transform_class=NOOPTransform,
remove_from_metadata=[pwd_key],
)
The initializer extends the TransformConfiguration
that provides simple
capture of our configuration data and enables the ability to pickle through the network.
It also adds a params
field that will be used below to hold the transform's
configuration data (used in NOOPTransform.init()
above).
Next, we provide two methods that define and capture the command line configuration that
is specific to the NOOPTransform
, in this case the parameters are the number of seconds to sleep during transformation
and an example command line parameter, pwd
("password"), option holding sensitive data that we don't want reported
in the job metadata produced by the Ray orchestrator.
The first method establishes the command line arguments.
It is given a global argument parser to which the NOOPTransform
arguments are added.
It is a good practice to include a common prefix to all transform-specific options (i.e. pii, lang, etc).
In our case we will use noop_
.
def add_input_params(self, parser: ArgumentParser) -> None:
parser.add_argument(
f"--{sleep_cli_param}",
type=int,
default=1,
help="Sleep actor for a number of seconds while processing the data frame, before writing the file to COS",
)
parser.add_argument(
f"--{pwd_cli_param}",
type=str,
default="nothing",
help="A dummy password which should be filtered out of the metadata",
)
NOOPTransform
-specific arguments.
def apply_input_params(self, args: Namespace) -> bool:
captured = CLIArgumentProvider.capture_parameters(args, cli_prefix, False)
if captured.get(sleep_key) < 0:
print(f"Parameter noop_sleep_sec should be non-negative. you specified {args.noop_sleep_sec}")
return False
self.params = captured
return True
Runtime Launching
To run the transform on a set of input data, we use one of the runtimes, each described below.
Python Runtime
To run in the python runtime, we need to create the instance of PythonTransformLauncher
using the NOOPTransformConfiguration
, and launch it as follows:
from data_processing.runtime.pure_python import PythonTransformLauncher
if __name__ == "__main__":
launcher = PythonTransformLauncher(runtime_config=NOOPTransformConfiguration())
launcher.launch()
Assuming the above main
code is placed in noop_main.py
we can run the transform on some test data. We'll use data in the repo for the noop transform
and create a temporary directory to hold the output:
export DPK_REPOROOT=...
export NOOP_INPUT=$DPK_REPOROOT/transforms/universal/noop/python/test-data/input
python noop_main.py --noop_sleep_sec 2 \
--data_local_config "{'input_folder': '"$NOOP_INPUT"', 'output_folder': '/tmp/noop-output'}"
Ray Runtime
To run in the Ray runtime, instead of creating the PythonTransformLauncher
we use the RayTransformLauncher
.
as follows:
class NOOPRayTransformConfiguration(RayTransformRuntimeConfiguration):
def __init__(self):
super().__init__(transform_config=NOOPTransformConfiguration())
from data_processing_ray.runtime.ray import RayTransformLauncher
if __name__ == "__main__":
launcher = RayTransformLauncher(runtime_config=NOOPRayTransformConfiguration())
launcher.launch()
--run_locally True
option.
python noop_main.py --noop_sleep_sec 2 \
--data_local_config "{'input_folder': '"$NOOP_INPUT"', 'output_folder': '/tmp/noop-output'}" --run_locally True