Environments (Streaming)#

An environment defines the execution context for running flows. It specifies the engine to be used, the libraries to be installed, and additional runtime parameters such as the number of CPU cores, memory allocation, and the maximum number of concurrent flow runs.

When configuring an environment, a command is generated that can be used to launch the environment. This command typically runs a Docker container with the selected engine version, all required libraries, and the specified parameters.

Creating an Environment#

In the UI, you can create a new Environment by navigating to Manage -> StreamSets -> New environment.

Screenshot of the Environment creation form in the UI

You will be prompted to provide a Name and select a Data Collector engine version, and other additional configuration options.

Screenshot of the Environment configuration form in the UI

In the SDK, you can create an environment from a Project object using the Project.create_environment() method. At a minimum, you must provide the name parameter. All other parameters are optional and will be populated with default values if not specified. This method returns a Environment object.

>>> env = project.create_environment(
...     name='Sample',
...     # Optional parameters - see API Reference for more optional parameters
...     engine_version=engine_version,
...     description='Basic env.',
...     stage_libs=[
...         'streamsets-datacollector-basic-lib',
...         'streamsets-datacollector-dataformats-lib',
...         'streamsets-datacollector-dev-lib'
...     ],
...     cpus_to_allocate=2,
... )
>>> env
Environment(name='Sample', description='Basic env.', ...)

Note

For information on available Engine versions, see Engines Versions.
For details on supported libraries, see Engine Details.

Retrieving an Existing Environment#

Environments can be retrieved through a Project object. You can access all environments associated with a project using the Project.environments property. You can also retrieve single environment using the Project.environments.get() method, which requires the environment_id parameter.

>>> # Get all environments associated with the project
>>> project.environments
[...Environment(name='Sample', description='Basic env.', ...)...]

>>> # Get a single environment by its id
>>> project.environments.get(environment_id=env.environment_id)
Environment(name='Sample', description='Basic env.', ...)

Modifying an Environment#

In the UI, you can update or delete an existing Environment by navigating to the Manage -> StreamSets.

Screenshot of the Environment update form in the UI

Updating an Environment#

Similar to environment creation, environment can also be updated using the Project object. First, modify properties of the environment instance, then update it using the Project.update_environment() method.

>>> # Modify environment settings
>>> env.max_memory_used = 80
>>> env.stage_libs.append('streamsets-datacollector-aws-lib')

>>> # Update the environment on the platform
>>> project.update_environment(env)
<Response [200]>
>>> env = project.environments.get(environment_id=env.environment_id)
>>> env.stage_libs
['streamsets-datacollector-basic-lib', 'streamsets-datacollector-dataformats-lib', 'streamsets-datacollector-dev-lib', 'streamsets-datacollector-aws-lib']

Retrieving the Engine Installation Command#

To start and run the Engine defined by an Environment, you’ll need to retrieve the installation command for that Environment and execute it from the machine where you want the Engine to run.

In the UI, you can retrieve the run command by navigating to the Manage -> StreamSets.

Screenshot of the Environment update form in the UI

You can retrieve the run command via the Environment object by using Environment.get_installation_command() method.

>>> env.get_installation_command(
...     # Optional parameters
...     pretty=False,
...     foreground=True
... )
'docker run -d --restart on-failure --cpus=4.0 --hostname "$(hostname)" -p 18630:18630 ...'

Note

Please be aware that the installation command you retrieve from the Environment requires the SSET_API_KEY environment variable to be set for the user executing the command. The environment variable should contain the API key you generated for authenticating with the watsonx.data integration platform.

Deleting an Environments#

To remove a single environment, use the Project.delete_environment() method. The delete method returns an API response, which you can insepct to verify the status code.

>>> project.delete_environment(env)
<Response [200]>

To remove multiple environments at once, use the Project.delete_environments() method. This method accepts any number of Environment instances and returns a single HTTP response.

>>> env1 = project.create_environment(name='Sample1')
>>> env2 = project.create_environment(name='Sample2')
>>> project.delete_environments(env1, env2)
<Response [200]>

Retrieving Available Engine Versions#

To view the list of available Engine versions, use the Platform.available_engine_versions property. This returns a StreamingEngineVersions object that can be used to list StreamingEngineVersion which can be used when configuring an Environment.

>>> platform.available_engine_versions
[...StreamingEngineVersion(engine_version_id='JDK17_6.3.0', engine_type='data_collector', ...)...]

Retrieving Engine Version Details#

In the UI, you can view the list of libraries supported by a specific Engine version during Environment configuration:

Screenshot showing supported libraries for an Engine version in the UI

In the SDK, to retrieve the list of libraries supported by a given Engine version, use the Platform.available_engine_versions property to list all available engine versions. From here, you get a collection of StreamingEngineVersion objects which you can inspect further.

>>> platform.available_engine_versions
[...StreamingEngineVersion(engine_version_id='JDK17_6.4.0', engine_type='data_collector', ...)...]

>>> # To see all stage library IDs that can be used in environment.stage_libs
>>> streaming_engine_version = platform.available_engine_versions[0]
>>> streaming_engine_version.stage_libs
[...StreamingEngineStageLib(stage_lib_id='streamsets-datacollector-aws-lib', label='Amazon Web Services')...]