Examples#
Creating and Running a Batch Flow#
Here is an example of how to create and run a simple batch flow. The flow will contain a Row Generator stage connected to a Peek stage. Here is a picture of what the flow looks like in the UI.
- This example will cover 4 steps:
Set up authentication
Create a new project
Create and save a new flow
Create and run a job for the flow
from ibm_watsonx_data_integration.common.auth import IAMAuthenticator
from ibm_watsonx_data_integration import Platform
from ibm_watsonx_data_integration.services.datastage import *
# 1.Set up authentication
api_key = 'API_KEY'
auth = IAMAuthenticator(api_key=api_key)
platform = Platform(auth, base_api_url='https://api.ca-tor.dai.cloud.ibm.com')
# 2.Create a new project
project = platform.create_project(
name='My first project',
description='Building sample batch flows',
tags=['flow_test_project'],
public=True,
project_type='wx'
)
project
# 3.Create and save a new flow
# Flow
flow = project.create_flow(
name='RowGenPeek',
environment=None,
flow_type='batch'
)
# Stages
row_generator = flow.add_stage('Row Generator', 'Row_Generator')
peek = flow.add_stage('Peek', 'Peek')
# Links
link_1 = row_generator.connect_output_to(peek)
link_1.name = 'Link_1'
row_generator_schema = link_1.create_schema()
row_generator_schema.add_field('VARCHAR', 'COLUMN_1').length(100)
project.update_flow(flow)
# 4.Run a job for the flow
row_gen_peek_job = project.create_job(name='RowGenPeek_job', flow=flow)
job_start = row_gen_peek_job.start(name='RowGenPeek_job_run', description='')
Creating and Running a Streaming Flow#
Here is an example of how to create and run a simple streaming flow. The flow will contain a Dev Raw Data Source connected to a Trash stage. Here is a picture of what the flow looks like in the UI.
- This example will cover 6 steps:
Set up authentication
Create a new project
Create a new environment
Create and save a new flow
Create and run a job for the flow
Retrieve and run the engine installation command
from ibm_watsonx_data_integration.common.auth import IAMAuthenticator
from ibm_watsonx_data_integration import Platform
from ibm_watsonx_data_integration.services.datastage import *
# 1.Set up authentication
api_key = 'API_KEY'
auth = IAMAuthenticator(api_key=api_key)
platform = Platform(auth, base_api_url='https://api.ca-tor.dai.cloud.ibm.com')
# 2.Create a new project
project = platform.create_project(
name='My first project',
description='Building sample streaming flows',
tags=['flow_test_project'],
public=True,
project_type='wx'
)
# 3.Create a new environment
environment = project.create_environment(
name='My first environment',
engine_version=platform.available_engine_versions[0],
)
# 4.Create and save a new flow
# Flow
flow = project.create_flow(
name='DevRawDataSourceTrash',
environment=environment,
flow_type='streaming'
)
# Stages
dev = flow.add_stage('Dev Raw Data Source')
trash = flow.add_stage('Trash')
# Links
dev.connect_output_to(trash)
project.update_flow(flow)
# 5.Run a job for the flow
dev_trash_job = project.create_job(name='DevRawDataSourceTrash_job', flow=flow)
job_start = dev_trash_job.start(name='DevRawDataSourceTrash_job_run', description='')
Before this flow can be run there is one more step. You must retrieve the installation command for the engine of the environment by calling the Environment.get_installation_command() method.
Then, replace the SSET_API_KEY variable in the command with your api_key and run the command.
# Replace the returned command's SSET_API_KEY with your api_key and then run the command.
environment.get_installation_command()