Skip to content

Rocket AI Hub

In this doc, we will cover how to access Db2 for i from Rocket AI Hub. To do this, we will create a Kubeflow Pipeline that reads data from Db2 for i, using a Trino Connector and Mapepire.

Configure Db2 for i Trino Connector

Before we build a pipeline that reads in data from db2 for i, let’s first connect to the Db2 for i database from a Kubeflow Notebook Server using the Trino Python SDK. In order to do this, we need to configure the Trino Connector for Db2 for i.
Here we assume that a Trino instance is already installed in the OpenShift cluster. If you haven’t installed it yet, please refer to the instructions in this link.

Configure Db2 for i Connection

To connect to Db2 for i, we need to configure a connection to the database. We will be using the following Connector: trino-db2i. Here are the steps in detail:

Add the plugins for Trino JTOpen connector

By default, the Trino instance installed using the official Trino image does not have the plugins for Trino JTOpen connector. So you need to download all the plugins from the link above and add them into the default plugin directory /usr/lib/trino/plugin for that Trino instance.

Create a Catalog Properties file

In Trino’s catalog directory (/etc/trino/catalog), create a new file called db2i.properties with the following content:

connector.name=jtopen
connection-url=jdbc:as400://<ip>:<port>/<database>;date format=iso;
connection-user=<your-username>
connection-password=<your-password>

Replace <ip>, <port>, <database>, <your-username>, and <your-password> with your Db2 for i connection details.

Note: Ensure the trailing semicolon ; is included in the connection-url.

Restart Trino in OpenShift Cluster

After creating the db2i.properties file, restart the Trino service in the OpenShift cluster to apply the changes.

Connect to Db2 for i from a Kubeflow Notebook

Now that we have configured the Trino Connector for Db2 for i, we can connect to the database from a Kubeflow Notebook. Here is an example of how to connect to Db2 for i using the Trino Python SDK:

import logging
import pandas as pd
import sys
from trino.dbapi import Connection
def Load_Dataframe_via_Trino(
query: str,
host: str = "trino.trino",
port: int = 8080,
user: str = "anybody",
catalog: str = None,
schema: str = None,
):
"""
Load a Pandas DataFrame using Trino as the SQL client.
Parameters:
query: SQL query for data.
host: Trino host (default: "trino.trino").
port: Trino port (default: 8080).
user: Query user (default: "anybody").
catalog: Query catalog.
schema: Query schema.
Returns:
Pandas DataFrame with query results.
"""
with Connection(
host=host,
port=port,
user=user,
catalog=catalog,
schema=schema,
) as conn:
cursor = conn.cursor()
cursor.execute(query)
dataframe = pd.DataFrame(
cursor.fetchall(), columns=[i.name for i in cursor.description]
)
return dataframe

This function can be used to load data from Db2 for i into a Pandas DataFrame. Here is an example of how to use it:

query = "select * from db2i.sample.department limit 5"
df = Load_Dataframe_via_Trino(query=query)
df.to_dict()

Note: depending on your Trino installation, you may need to adjust the host, port, user parameters in the Load_Dataframe_via_Trino function.

This will return a dictionary with the first 5 rows of the department table in the sample schema of the db2i database.

{'deptno': {0: 'A00', 1: 'B01', 2: 'C01', 3: 'D01', 4: 'D11'},
'deptname': {0: 'SPIFFY COMPUTER SERVICE DIV.',
1: 'PLANNING',
2: 'INFORMATION CENTER',
3: 'DEVELOPMENT CENTER',
4: 'MANUFACTURING SYSTEMS'},
'mgrno': {0: '000010', 1: '000020', 2: '000030', 3: None, 4: '000060'},
'admrdept': {0: 'A00', 1: 'A00', 2: 'A00', 3: 'A00', 4: 'D01'},
'location': {0: None, 1: None, 2: None, 3: None, 4: None}}

Kubeflow Pipeline

Kubeflow Pipelines (KFP) is a platform for building and deploying reproducible and portable machine learning workflows using containers. KFP allows you to author components and pipelines using the KFP Python SDK.

Why Kubeflow Pipelines?

  • Develop complete machine learning workflows directly in Python
  • Build custom machine learning components or use a wide range of existing ones
  • Manage, track, and visualize pipeline definitions, runs, experiments, and machine learning artifacts with ease
  • Optimize compute resource usage through parallel task execution and caching to avoid redundant executions

What is a pipeline?

A pipeline defines a sequence of tasks to be executed in a computational directed acyclic graph (DAG). These tasks can called pipeline components. Each component is a self-contained set of code that performs one step in the pipeline. At runtime, each component is executed as a containerized application which may create ML artifacts that are passed to other components in the pipeline.

Hello World Pipeline

Let’s look at a simple example of a Kubeflow Pipeline before we dive into the Db2 for i example. The following code defines a simple pipeline that prints “Hello World!”.

First make sure kfp is installed:

Terminal window
pip install kfp

Here is a simple pipeline that prints “Hello World!”:

from kfp import dsl
@dsl.component
def say_hello(name: str) -> str:
hello_text = f'Hello, {name}!'
print(hello_text)
return hello_text
@dsl.pipeline
def hello_pipeline(recipient: str) -> str:
hello_task = say_hello(name=recipient)
return hello_task.output

Compile the pipeline to YAML with KFP Compiler:

from kfp import compiler
compiler.Compiler().compile(hello_pipeline, 'hello_pipeline.yaml')

The following submits the pipeline for execution with the argument recipient=‘World’:

from kfp.client import Client
client = Client(host='<MY-KFP-ENDPOINT>')
run = client.create_run_from_pipeline_package(
'pipeline.yaml',
arguments={
'recipient': 'World',
},
)

The client will print a link to view the pipeline execution graph and logs in the UI. In this case, the pipeline has one task that prints and returns 'Hello, World!'.

We can now build a pipeline that reads data from Db2 for i using the Trino Connector and the code we wrote above.

Db2 for i Pipeline

Here is an example of a Kubeflow Pipeline that reads data from Db2 for i using the Trino Connector and the Load_Dataframe_via_Trino function we defined earlier. To export the pipeline to a YAML file, use the following code:

load_dataframe_via_trino_comp = create_component_from_func(
func=Load_Dataframe_via_Trino,
output_component_file="component.yaml",
base_image=BASE_IMAGE,
)

This will create a component file that can be used in a pipeline definition. We can use this yaml file when we create a pipeline definition.

Read Component from YAML

import kfp
component = '/path/to/component.yaml'
load_dataframe_via_trino_comp = kfp.components.load_component_from_file(component)

Define the pipeline

import kfp.dsl as dsl
@dsl.pipeline(
name="Component Test - Load Dataframe via Jtopen Trino",
description="A simple component test",
)
def train_pipeline():
load_dataframe_via_trino_comp(
query="SELECT * FROM db2i.sample.department limit 5",
columns=None,
columns_query="show columns from db2i.sample.department",
)

Create a pipeline run

with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f:
NAMESPACE = f.read()
KFP_CLIENT = kfp.Client()
KFP_CLIENT.create_run_from_pipeline_func(
train_pipeline, arguments={}, namespace=NAMESPACE
)

This will kick off a pipeline run in Kubeflow: alt text