Stages#

A stage is a part of a flow responsible for processing the flow’s data in a single, simple way. By connecting multiple stages together, you can perform complex tasks.

The SDK provides many ways to interact between a flow and its stages:
  • Adding a stage to a flow

  • Connecting stages

  • Editing a stage’s configuration

Listing and retreiving Stages in a Flow#

To list all the stages in a flow, you can use the BatchFlow.stages property. This returns a list of StageNode instances.

>>> batch_flow.stages
[RowGeneratorStage(label='Row_Generator'), PeekStage(label='Peek'), RowGeneratorStage(label='Row_Generator_2'), PeekStage(label='Peek_2'), GoogleBigqueryStage(label='BigQuery')]

If you wish to filter these stages using arguments you can use the Stages.get_all() method. These arguments can be any of the fields found in the configurations of the stages you want, such as the type of the stage or the output_count of the stage.

>>> batch_flow.stages.get_all(type="Row Generator")
[RowGeneratorStage(label='Row_Generator'), RowGeneratorStage(label='Row_Generator_2')]

Finally, to retreive a specific stage from a flow use the Stages.get() method. The label of a stage is a unique identifier so you can pass this value in to find the desired stage.

>>> batch_flow.stages.get(label="Row_Generator")
RowGeneratorStage(label='Row_Generator')

Adding a Stage to a Flow#

To add stages to a flow in the UI, you can open the dropdown menu on the left of the flow edit page and select the stage you want to add.

Screenshot for adding a stage.

In the SDK, you can use the BatchFlow.add_stage() method. This method takes in the stage’s official type and the label for that stage. The type is the unique identifier for that stage and the label is the name you wish to give it.

This method returns the newly created stage.

>>> amazon_rds = batch_flow.add_stage(type = 'Amazon RDS for PostgreSQL', label = 'Amazon RDS')
>>> project.update_flow(batch_flow)
<Response [201]>

You can find a list of all unique identifiers here: BatchFlow.add_stage()

Remove a Stage from a Flow#

In the UI, you can click on the three dots at the top right of the stage and click on the delete button from the menu.

Screenshot for adding a stage.

In the SDK, you can remove a stage from a flow using the BatchFlow.remove_stage() method and passing an instance of StageNode to it.

All stages connected to this stage will be disconnected by this action.

This method does not return anything.

After removing stages you need to call Project.update_flow() method to save the changes.

>>> amazon_sqs_consumer = batch_flow.stages.get(label="Amazon RDS")
>>> batch_flow.remove_stage(amazon_sqs_consumer)
>>> project.update_flow(flow)
<Response [200]>

Connecting Stages#

In the UI, to connect stages, you can click on the output of a stage and drag it to another stage.

Screenshot for connecting a stage.

In the SDK, to connect batch stages to each other we can use the Stage.connect_output_to() method. This method connects two stages and returns the created link between them.

>>> row_gen = batch_flow.add_stage('Row Generator', 'Row_Generator') # a sample origin stage that generates data
>>> peek = batch_flow.add_stage('Peek', 'Peek') # a sample destination stage that outputs all input to console
>>> link = row_gen.connect_output_to(peek)

Editing a Stage’s Configuration#

All stages have properties which can be configured. You can edit a stage’s configuration through its corresponding configuration property. This property returns an object containing all of that stage’s properties. The configuration object supports both dot and bracket notation for editing.

>>> row_gen.configuration
{
    "buf_mode": "default",
    "max_mem_buf_size": 3145728,
    ...
}
>>> row_gen.configuration.records = 5
>>> row_gen.configuration["runtime_column_propagation"] = False