Skip to content

Advanced Transform Tutorial

In this example, we implement an ededup transform that removes duplicate documents across all files. In this tutorial, we will show the following:

  • How to write the ededup transform to generate the output table.
  • How to define transform-specific metadata that can be associated with each table transformation and aggregated across all transformations in a single run of the transform.
  • How to implement custom TransformRuntime to create supporting Ray objects and supplement transform-specific metadata with the information about this statistics
  • How to define command line arguments that can be used to configure the operation of our noop transform.

The complete task involves the following:

  • EdedupTransform - class that implements the specific transformation
  • EdedupRuntime - class that implements custom TransformRuntime to create supporting Ray objects and enhance job output statistics
  • EdedupTableTransformConfiguration - class that provides configuration for the EdedupTransform and EdedupRuntime, including transform runtime class and the command line arguments used to configure them.
  • main() - simple creation and use of the TransformLauncher.

(Currently, the complete code for the noop transform used for this tutorial can be found in the ededup transform directory.

Finally, we show to use the command line to run the transform in a local ray cluster

HashFilter

One of the basic components of exact dedup implementation is a cache of hashes. That is why we will start from implementing this support actor. The implementation is fairly straight forward and can be found here

EdedupTransform

First, let's define the transform class. To do this we extend the base abstract/interface class AbstractTableTransform, which requires definition of the following:

  • an initializer (i.e. init()) that accepts a dictionary of configuration data. For this example, the configuration data will only be defined by command line arguments (defined below).
  • the transform() method itself that takes an input table and produces an output table and any associated metadata for that table transformation.

Other methods such as flush() need not be overridden/redefined for this example.

We start with the simple definition of the class, its initializer and the imports required by subsequent code:

from argparse import ArgumentParser, Namespace
from typing import Any

import pyarrow as pa
import ray
from data_processing_ray.data_access import DataAccessFactoryBase
from data_processing_ray.runtime.ray import (
  DefaultRayTransformRuntime,
  RayTransformLauncher,
  RayUtils,
)
from data_processing_ray.runtime.ray.runtime_configuration import (
  RayTransformRuntimeConfiguration,
)
from data_processing.transform import AbstractTableTransform, TransformConfiguration
from data_processing.utils import GB, CLIArgumentProvider, TransformUtils, get_logger
from ray.actor import ActorHandle


class EdedupTransform(AbstractTableTransform):

  def __init__(self, config: dict):
    super().__init__(config)
    self.doc_column = config.get("doc_column", "")
    self.hashes = config.get("hashes", [])
The EdedupTransform class extends the AbstractTableTransform, which defines the required methods.

For purposes of the tutorial and to simulate a more complex processing job, our initializer allows our transform to be configurable with document column name and a list of hash actors during the call to transform(). Configuration is provided by the framework in a dictionary provided to the initializer. Below we will cover how doc_column and hashes arguments are made available to the initializer.

Next we define the transform() method itself, which includes the addition of some metadata.

    def transform(self, table: pa.Table) -> tuple[list[pa.Table], dict[str, Any]]:


  if not TransformUtils.validate_columns(table=table, required=[self.doc_column]):
    return [], {}
# Inner variables
hashes = set()
unique = []
hd = {}
# Compute unique hashes for the table
for text in table[self.doc_column]:
  # Compute doc hash
  h = TransformUtils.str_to_hash(TransformUtils.normalize_string(str(text)))
  if h not in hashes:  # Processing this hash for the first time
    hashes.add(h)  # Remember it locally
    hd[h] = str(text)
    if len(hd) >= REQUEST_LEN:  # time to check remotely
      unique = unique + self._process_cached_hashes(hd=hd)
      hd = {}
if len(hd) > 0:  # Process remaining hashes
  unique = unique + self._process_cached_hashes(hd=hd)

# Remove duplicates
unique_set = set(unique)
mask = [False] * table.num_rows
index = 0
for text in table[self.doc_column]:
  str_text = str(text)
  if str_text in unique_set:
    mask[index] = True
    unique_set.remove(str_text)
  index += 1
# Create output table
out_table = table.filter(mask)
# report statistics
stats = {"source_documents": table.num_rows, "result_documents": out_table.num_rows}
return [out_table], stats
The single input to this method is the in-memory pyarrow table to be transformed. The return of this function is a list of tables and optional metadata. In this case of simple 1:1 table conversion the list will contain a single table, result of removing duplicates from input table.

The metadata is a free-form dictionary of keys with numeric values that will be aggregated by the framework and reported as aggregated job statistics metadata. If there is no metadata then simply return an empty dictionary.

EdedupRuntime

First, let's define the transform runtime class. To do this we extend the base abstract/interface class DefaultTableTransformRuntime, which requires definition of the following:

  • an initializer (i.e. init()) that accepts a dictionary of configuration data. For this example, the configuration data will only be defined by command line arguments (defined below).
  • the get_transform_config() method that takes data_access_factory, statistics actor, and list of files to process and produces a dictionary of parameters used by transform.
  • the compute_execution_stats() method that takes take a dictionary of metadata, enhances it and produces an enhanced metadata dictionary.

We start with the simple definition of the class and its initializer

class EdedupRuntime(DefaultTableTransformRuntime):

    def __init__(self, params: dict[str, Any]):
        super().__init__(params)
        self.filters = []
Next we define the get_transform_config() method, which, in this case, creates supporting Ray Actors and adds their handles to the transform parameters

    def get_transform_config(
        self, data_access_factory: DataAccessFactory, statistics: ActorHandle, files: list[str]
) -> dict[str, Any]:


self.aggregators = RayUtils.create_actors(
  clazz=HashFilter,
  params={},
  actor_options={"num_cpus": self.params.get("hash_cpu", 0.5)},
  n_actors=self.params.get("num_hashes", 1),
)
return {"hashes": self.aggregators} | self.params
Inputs to this method includes a set of parameters, that moght not be needed for this transformer, but rather a superset of all parameters that can be used by different implementations of transform runtime ( see for example fuzzy dedup, etc). The return of this function is a dictionary information for transformer initialization. In this implementation we add additional parameters to the input dictionary, but in general, it can be a completely new dictionary build here

Finally we define the compute_execution_stats() method, which which enhances metadata collected by statistics class

    def compute_execution_stats(self, stats: dict[str, Any]) -> dict[str, Any]:


# Get filters stats
sum_hash = 0
sum_hash_mem = 0
remote_replies = [f.get_size.remote() for f in self.aggregators]
while remote_replies:
  # Wait for replies
  ready, not_ready = ray.wait(remote_replies)
  for r in ready:
    h_size, h_memory = ray.get(r)
    sum_hash = sum_hash + h_size
    sum_hash_mem = sum_hash_mem + h_memory
  remote_replies = not_ready
dedup_prst = 100 * (1.0 - stats.get("result_documents", 1) / stats.get("source_documents", 1))
return {"number of hashes": sum_hash, "hash memory, GB": sum_hash_mem, "de duplication %": dedup_prst} | stats
Input to this method is a dictionary of metadata collected by statistics object. It then enhances it by information collected by hash actors and custom computations based on statistics data.

EdedupTableTransformConfiguration

The final class we need to implement is EdedupRayTransformConfiguration class that provides configuration for running our transform. Although we provide only Ray-based implementation, Ray-based configuration relies on Python-based configuration that we need to define first. So we first need to define EdedupTableTransformConfiguration class, defining the following:

  • The short name for the transform
  • The class implementing the transform - in our case EdedupTransform
  • The transform runtime class be used - in our case EdedupRuntime
  • Command line argument support.

First we define the class and its initializer,

short_name = "ededup"
cli_prefix = f"{short_name}_"

class EdedupTableTransformConfiguration(TransformConfiguration):
  def __init__(self):
    super().__init__(
      name=short_name,
      transform_class=EdedupTransform,
    )

The initializer extends the DefaultTableTransformConfiguration which provides simple capture of our configuration data and enables picklability through the network. It also adds a params field that will be used below to hold the transform's configuration data (used in EdedupRuntime.init() above).

Next, we provide two methods that define and capture the command line configuration that is specific to the EdedupTransform, in this case the number of seconds to sleep during transformation. First we define the method establishes the command line arguments. This method is given a global argument parser to which the EdedupTransform arguments are added. It is good practice to include a common prefix to all transform-specific options (i.e. pii, lang, etc). In our case we will use noop_.

    def add_input_params(self, parser: ArgumentParser) -> None:
      parser.add_argument(f"--{cli_prefix}hash_cpu", type=float, default=0.5, help="number of CPUs per hash")
      parser.add_argument(f"--{cli_prefix}num_hashes", type=int, default=0, help="number of hash actors to use")
      parser.add_argument(f"--{cli_prefix}doc_column", type=str, default="contents", help="key for accessing data")
Next we implement a method that is called after the framework has parsed the CLI args and which allows us to capture the EdedupTransform-specific arguments and optionally validate them.

    def apply_input_params(self, args: Namespace) -> bool:
      captured = CLIArgumentProvider.capture_parameters(args, cli_prefix, False)
      self.params = self.params | captured
      if self.params["num_hashes"] <= 0:
        logger.info(f"Number of hashes should be greater then zero, provided {args.num_hashes}")
        return False
      logger.info(f"exact dedup params are {self.params}")
      return True
Now we can implement EdedupRayTransformConfigurationwith the following code
class EdedupRayTransformConfiguration(RayTransformConfiguration):
    def __init__(self):
        super().__init__(transform_config=EdedupTableTransformConfiguration(), runtime_class=EdedupRuntime)

main()

Next, we show how to launch the framework with the EdedupTransform using the framework's TransformLauncher class.

if __name__ == "__main__":
  launcher = RayTransformLauncher(EdedupRayTransformConfiguration())
  launcher.launch()

The launcher requires only an instance of DefaultTableTransformConfiguration (our EdedupRayTransformConfiguration class). A single method launch() is then invoked to run the transform in a Ray cluster.

Running

Note: You will need to run the setup commands in the README before running the following examples.

Assuming the above main() is placed in ededup_transform.py we can run the transform on local data as follows:

make run-cli-sample
See the launcher options for a complete list of transform-independent command line options.