Flows#
A flow is an object used for storing the execution flow of a data pipeline. A flow is comprised of multiple stages with each stage defining how data is handled in that part of the execution flow.
- The SDK provides the following functionality to interact with flows:
Creating a flow
Retrieving flows
Editing a flow
Updating a flow
Duplicating a flow
Deleting a flow
Validating a flow
Handling error records
Prerequisites#
- To create a flow using the SDK, please make sure you have done the following steps:
Note
Currently, the SDK only supports creating a flow with an engine installed.
Creating a Flow#
In the UI, you can create a flow by navigating to Assets -> New asset -> Create a flow.
Here, you will be required to choose an environment for your flow along with a name and description.
In the SDK, you can create a flow from a Project object using the
Project.create_flow() method.
You are required to supply a name and environment parameters and an optional description parameter.
This method will return a StreamsetsFlow instance.
>>> new_flow = project.create_flow(name='My first flow', description='optional description', environment=environment)
>>> new_flow
StreamsetsFlow(name='My first flow', description='optional description', flow_id=..., engine_version=...)
Retrieving Flows#
Flows can be retrieved through a Project object using the
Project.flows property.
You can also retrieve a single flow using the Project.flows.get() method
which requires the flow_id parameter.
>>> project.flows # a list of all the flows
[...StreamsetsFlow(name='My first flow', description='optional description', ...)...]
>>> project.flows.get(name='My first flow')
StreamsetsFlow(name='My first flow', description='optional description', ...)
Editing a Flow#
You can edit a flow in multiple ways.
For starters, you can edit a flow’s attributes like name or description.
>>> new_flow.description = 'new description for the flow'
>>> new_flow
StreamsetsFlow(name='My first flow', description='new description for the flow', ...)
You can edit a flow’s configuration through the StreamsetsFlow.configuration property.
This property returns a Configuration object which encapsulates a flow’s configuration.
You can print out the configuration and edit it similar to a dict.
>>> new_flow.configuration['retry_pipeline_on_error']
True
>>> new_flow.configuration['retry_pipeline_on_error'] = False
Finally, you can edit a flow by editing its stages. This can include adding a stage, removing a stage, updating a stage’s configuration or connecting a stage in a different way than before. All the operations described are covered in the Stage documentation.
Updating a Flow#
In the UI, you can update a flow by making changes to the flow and hitting the ‘Save’ icon to update the flow.
In the SDK, you can make any changes to a StreamsetsFlow instance
in memory and update it by passing this object to Project.update_flow() method.
This method returns an HTTP response indicating the status of the update operation.
>>> new_flow.name = 'new flow name' # you can also update the stages, configuration, etc.
>>> project.update_flow(new_flow)
<Response [200]>
>>> new_flow
StreamsetsFlow(name='new flow name', description='new description for the flow', ...)
Duplicating a Flow#
To duplicate a flow using the SDK, you need to pass a StreamsetsFlow instance
to the Project.duplicate_flow() method
along with the name parameter for the name of the new flow and an optional description parameter.
This will duplicate a flow and return a new instance of StreamsetsFlow.
>>> duplicated_flow = project.duplicate_flow(new_flow, name='duplicated flow', description='duplicated flow description')
>>> duplicated_flow
StreamsetsFlow(name='duplicated flow', description='duplicated flow description', ...)
Deleting a Flow#
To delete a flow in the UI, you can go to Assets, choose a flow and click on the three dots next to it and choose Delete.
To delete a flow via the SDK, you need to pass a StreamsetsFlow instance
to the Project.delete_flow() method.
This method returns an HTTP response indicating the status of the update operation.
>>> project.delete_flow(duplicated_flow)
<Response [204]>
Validating a Flow#
In the UI, you can update a flow by making changes to the flow and hitting the ‘Validate’ icon to validate the flow.
To validate a flow via the SDK, you need to update a flow, and then call the StreamsetsFlow.validate() method.
This will return a ValidationResult object.
This object contains a issues attribute that contains a list of FlowValidationError instances if there are any errors.
>>> new_flow.add_stage('Trash')
Trash_01()
>>> project.update_flow(new_flow)
<Response [200]>
>>> new_flow.validate()
ValidationResult(success=False, issues=[FlowValidationError(type='stageIssues', instanceName='Trash_01', humanReadableMessage='The first stage must be an origin'), FlowValidationError(type='stageIssues', instanceName='Trash_01', humanReadableMessage='Target must have input streams')], message='Validation failed')
Handling Error Records#
To edit error record handling on the UI, click the gear icon on the top-right of the screen in a flow’s edit page.
This opens a new pop-up window with a tab for Error records on the left. This will let you adjust the error record handling for the flow.
The page lets you change how error records are handled by policy and which stage should handle them.
Let’s learn how to change the policy first.
The possible options for error record policy are Original record as it was generated by the origin and
Record as it was seen by the stage that sent it to error stream.
In the SDK, these equate to ORIGINAL_RECORD and STAGE_RECORD.
This can be updated in a flow’s configuration.
>>> new_flow.configuration['error_record_policy']
'ORIGINAL_RECORD'
>>> new_flow.configuration['error_record_policy'] = 'STAGE_RECORD'
To change the error record stage, you can call StreamsetsFlow.set_error_stage() method.
You need to pass either the label or the name of the new error stage, you can also optionally pass in the new stage’s library.
Note
All error stages other than Discard will have configuration options for you to customize your experience.
>>> write_to_file = new_flow.set_error_stage('Write to File')
>>> write_to_file.configuration['directory'] = '/path/to/some/directory'
Finally, you can view the current error stage for a flow at any point using the StreamsetsFlow.error_stage property.
>>> new_flow.error_stage
WritetoFile_ErrorStage()