Flows#


A flow is an object used for storing the execution flow of a data pipeline. It is comprised of multiple stages with each stage defining how data is handled in that part of the execution flow.

The SDK provides the following functionality to interact with streaming flows:
  • Creating a flow

  • Retrieving flows

  • Editing a flow

  • Updating a flow

  • Duplicating a flow

  • Deleting a flow

  • Exporting flows

  • Importing flows

  • Validating a flow

  • Previewing a flow

  • Handling error records

Prerequisites#

To create a flow using the SDK, please make sure you have done the following steps:

Note

Currently, the SDK only supports creating a flow with an engine installed.

Creating a Flow#

In the UI, you can create a flow by navigating to Assets -> New asset -> Create a flow.

Warning

Creating streaming flow in the UI requires setting an environment

Screenshot of the flow creation page.

In the SDK, you can create a flow from a Project object using the Project.create_flow() method. You are required to supply a name and environment parameters. On how to create an environment check creating an environment. You can also provide an optional description. You don’t have to specify a flow_type as its default value is streaming. This method returns a StreamingFlow instance.

>>> new_flow = project.create_flow(name='My streaming flow', description='optional description', environment=environment)
>>> new_flow
StreamingFlow(name='My streaming flow', description='optional description', flow_id=..., engine_version=...)

Retrieving Flows#

Flows can be retrieved through a Project object using the Project.flows property. You can also retrieve a single flow using the Project.flows.get() method. which requires the flow_id parameter.

>>> project.flows  # a list of all flows
[...StreamingFlow(name='My streaming flow', description='optional description', ...)...BatchFlow(..., name='My batch flow', description='optional description', ...)...]
>>> project.flows.get(name='My streaming flow')
StreamingFlow(name='My streaming flow', description='optional description', ...)

Editing a Flow#

You can edit a flow in multiple ways.

For starters, you can edit a flow’s attributes like name or description.

>>> new_flow.description = 'new description for the flow'
>>> new_flow
StreamsetsFlow(name='My first flow', description='new description for the flow', ...)

Also you can edit any flow by editing its stages. This can include adding a stage, removing a stage, updating a stage’s configuration or connecting a stage in a different way than before. All the operations described are covered in the Stages documentation.

On top of that streaming flows have many properties related to pipeline, engines, etc., you can edit a streaming flow’s configuration through the Flow.configuration property. This property returns a Configuration object which encapsulates a flow’s configuration. You can print out the configuration and edit it similarly to a dict.

>>> new_flow.configuration['retry_pipeline_on_error']
True
>>> new_flow.configuration['retry_pipeline_on_error'] = False

Updating a Flow#

In the UI, you can update a flow by making changes to the flow and hitting the ‘Save’ icon to update the flow.

Screenshot of the flow creation page.

In the SDK, you can make any changes to a Flow instance in memory and update it by passing this object to Project.update_flow() method.

This method returns an HTTP response indicating the status of the update operation.

>>> new_flow.name = 'new flow name'  # you can also update the stages, configuration, etc.
>>> project.update_flow(new_flow)
<Response [200]>
>>> new_flow
StreamsetsFlow(name='new flow name', description='new description for the flow', ...)

Duplicating a Flow#

To duplicate a flow using the SDK, you need to pass a Flow instance to the Project.duplicate_flow() method along with the name parameter for the name of the new flow and an optional description parameter.

This will duplicate a flow and return a new instance of Flow.

>>> duplicated_flow = project.duplicate_flow(new_flow, name='duplicated flow', description='duplicated flow description')
>>> duplicated_flow
StreamsetsFlow(name='duplicated flow', description='duplicated flow description', ...)

Deleting a Flow#

To delete a flow in the UI, you can go to Assets, choose a flow and click on the three dots next to it and choose Delete.

Screenshot of the flow creation page.

To delete a flow via the SDK, you need to pass a Flow instance to the Project.delete_flow() method.

This method returns an HTTP response indicating the status of the update operation.

>>> project.delete_flow(duplicated_flow)
<Response [204]>

Validating a Flow#

In the UI, you can validate a streaming flow by making changes to the flow and hitting the ‘Validate’ icon to validate the flow.

Screenshot of the validate button.

To validate a streaming flow via the SDK, you need to update a flow, and then call the StreamingFlow.validate() method. This will return a ValidationResult object. This object contains issues attribute that has a list of FlowValidationError instances if there are any errors.

>>> new_flow.add_stage('Trash')
Trash_01(stage_id='Trash_01')
>>> project.update_flow(new_flow)
<Response [200]>
>>> new_flow.validate()
ValidationResult(success=False, issues=[FlowValidationError(type='stageIssues', instanceName='Trash_01', humanReadableMessage='The first stage must be an origin'), FlowValidationError(type='stageIssues', instanceName='Trash_01', humanReadableMessage='Target must have input streams')], message='Validation Failed')

Previewing a flow#

In the UI, you can preview a flow by hitting the ‘Preview’ icon.

Screenshot of the preview button.

To preview a flow via the SDK, you need to call StreamingFlow.preview() method. This will return a list of PreviewStage instances. Each PreviewStage provides access to its input and output properties, which contain the input and output data for that stage.

>>> preview = flow.preview()
>>> preview
[PreviewStage(instance_name='DevRawDataSource_01'), PreviewStage(instance_name='Trash_01')]
>>> dev_raw_data_preview, trash_preview = preview
>>> dev_raw_data_preview.input
>>> dev_raw_data_preview.output
[('abc', 'xyz', 'lmn')]

Exporting Flows#

To export a flow via the SDK, you need to call the Project.export_streaming_flows() method and you must pass in either an individual StreamingFlow object, a StreamingFlows object, or a list of StreamingFlow objects.

You can set the following additional parameters:
  • with_plain_text_credentials – export credentials in plain text.

  • destination – specify the export location.

  • stream – stream the ZIP file data.

The function will return the location the exported zip file was written to.

>>> project.export_streaming_flows(flows=project.flows)
PosixPath('flows.zip')

Importing Flows#

To import a flow via the SDK, you need to call the Project.import_streaming_flows() method and you must specify the source parameter, which is the Path to the zip file containing the json(s) of the streaming flow(s) to be imported.

You must also set the conflict_resolution parameter which determines how to handle an attempted import of a duplicate flow which already exists in the project. The options for conflict_resolution are listed below:
  • 'skip' – skip this particular flow and move to the next flow to be imported.

  • 'replace' - replace the existing flow with the flow to be imported.

  • 'rename' - keep the existing flow and rename the flow that is being imported.

The function will return the imported StreamingFlow objects or it will return a list of the imported StreamingFlow objects.

>>> project.import_streaming_flows(source='flows_to_import.zip', conflict_resolution='skip')
StreamingFlow(name='dummy_flow', description='dummy_description', flow_id='...', engine_version='...')

Handling Error Records#

To edit error record handling on the UI, click the gear icon on the top-right of the screen in a flow’s edit page.

Screenshot of the flow creation page.

This opens a new pop-up window with a tab for Error records on the left. This will let you adjust the error record handling for the flow.

Screenshot of the flow creation page.

This page lets you change how error records are handled by policy and which stage should handle them.

The possible options for error record policy are Original record as it was generated by the origin and Record as it was seen by the stage that sent it to error stream. In the SDK, these equate to ORIGINAL_RECORD and STAGE_RECORD.

This can be updated in a flow’s configuration.

>>> new_flow.configuration['error_record_policy']
'ORIGINAL_RECORD'
>>> new_flow.configuration['error_record_policy'] = 'STAGE_RECORD'

To change the error record stage, you can call StreamingFlow.set_error_stage() method. You need to pass either the label or the name of the new error stage, you can also optionally pass in the new stage’s library.

Note

All error stages other than Discard will have configuration options for you to customize your experience.

>>> write_to_file = new_flow.set_error_stage('Write to File')
>>> write_to_file.configuration['directory'] = '/path/to/some/directory'

You can view the current error stage for a flow at any point using the StreamingFlow.error_stage property.

>>> new_flow.error_stage
WritetoFile_ErrorStage(stage_id='WritetoFile_ErrorStage')