Find out how to deploy your flows in an application mode Flink cluster as part of your production environment.
Important:
- Event Processing release 1.2.3 introduces a new flow export format, that can be used for deploying jobs customized for production or test environments. In most cases, this provides a better user-experience, and can be used with an automation in a continuous integration and continuous delivery (CI/CD) pipeline.
- This deployment cannot be used with the Event Processing UI.
Note: The Apache operator sample that is referenced in the following sections points to the version of the sample in the main
branch, which is up-to-date, and might include fixes that are absent in the release branches.
Prerequisites
-
The SQL statements are exported from the Event Processing UI and saved to a file, for example,
statements.sql
.For more information, see exporting flows.
-
You updated the Flink SQL connector properties and values that are defined in the file
statements.sql
to match your target environment.For security reasons, the values containing sensitive credentials, such as username and password, are removed when exporting the SQL statements from the Event Processing UI, so you must restore them.
Also, the exported SQL contains connector configuration that is applicable to the environment targeted in the Event Processing UI. When deploying to a different target environment, you might need to adapt the connector properties to the target environment.
See the following table for credentials and connector properties information about supported Flink SQL connectors:
Flink SQL connector Used by nodes Sensitive credentials Connector properties Kafka Source and destination About Kafka connector
Note: When configuring SCRAM authentication for the Kafka connector, ensure you use double quotes only. Do not use a backslash character (\
) to escape the double quotes. The valid format is:username="<username>" password="<password>"
Kafka connector properties
For more information about how events can be consumed from Kafka topics, see the Flink documentation.
Note: The Kafka connector value must bekafka
.JDBC Database About JDBC connector JDBC connector properties HTTP API About HTTP connector HTTP connector properties.
Note: the HTTP connector version for Event Processing versions 1.2.3 and later is 0.16.0. For earlier Event Processing 1.2.x releases, see the 0.15.0 connector documentation. -
To deploy a running Flink job, the SQL statements in the file
statements.sql
must contain one of the following clauses:- A definition of a Flink SQL Kafka sink (also known as event destination), and an
INSERT INTO
clause that selects the columns of the last temporary view into this sink. - A
SELECT
clause that takes one or all of the columns of the last temporary view.
For more information about how to define a Flink SQL sink, see the Flink documentation.
- A definition of a Flink SQL Kafka sink (also known as event destination), and an
Use Flink user-defined functions
Optionally, user-defined functions (UDFs) can be used as a complement of the built-in functions, by editing the SQL exported from the Event Processing UI.
For more information, see UDFs in the exported SQL.
Setup a connection to the Flink cluster
-
Log in to your Kubernetes cluster as a cluster administrator by setting your
kubectl
context. -
Switch to the namespace where the IBM Operator for Apache Flink is installed:
kubectl config set-context --current --namespace=<namespace>
Build and deploy a Flink SQL runner
You can use a Kubernetes FlinkDeployment
custom resource in application mode to deploy a Flink job for processing and deploying the statements in the file statements.sql
.
A sample application flink-sql-runner-example is provided in the Apache Flink GitHub repository for that purpose.
Follow the instructions to build:
- the flink-sql-runner-example JAR file (Flink job)
- the Docker image
Important: Ensure that the Flink SQL runner JAR file and the statements.sql
file have read permissions (644) for non-root users. If the JAR file is only readable by the root user, the FlinkDeployment
instance cannot be started by non-root users.
Some adaptations to this procedure are required to build the Docker image and use the file statements.sql
:
-
Modify the Dockerfile to use the IBM Flink image:
a. Execute the following command to extract the Flink image name including its SHA digest from the
ClusterServiceVersion
(CSV). For example, if you are running on Flink version 1.2.3:kubectl get csv -o jsonpath='{.spec.install.spec.deployments[*].spec.template.spec.containers[0].env[?(@.name=="IBM_FLINK_IMAGE")].value}' ibm-eventautomation-flink.v1.2.3
Alternatively, you can obtain the image name from the Flink operator pod’s environment variable:
kubectl set env pod/<flink_operator_pod_name> --list -n <flink_operator_namespace> | grep IBM_FLINK_IMAGE
b. Edit the Dockerfile and change the
FROM
clause to use the IBM Flink image with its SHA digest, as determined in the previous step.FROM --platform=<platform> <IBM Flink image with digest>
Where
<platform>
islinux/amd64
orlinux/s390x
, depending on your deployment target.c. If at the previous optional step you introduced the use of Flink user-defined functions (UDFs), edit the Dockerfile to copy the JAR file that contains the UDF classes:
COPY --chown=flink:root <path-of-the-udf-jar> /opt/flink/lib
For example:
COPY --chown=flink:root /udfproject/target/udf.jar /opt/flink/lib
d. Remove the sample SQL statement files from the sql-scripts directory.
e. Copy the
statements.sql
file to the directory sql-scripts.f. Build the docker image and push it to a registry accessible from your OpenShift Container Platform. If your registry requires authentication, configure the image pull secret, for example, by using the global cluster pull secret.
-
Create the IBM Operator for Apache Flink
FlinkDeployment
custom resource.You can use a Kubernetes
FlinkDeployment
custom resource in application mode to deploy a Flink job for processing and deploying the statements in thestatements.sql
file.a. You can start with the following example of an application mode Flink instance:
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: application-cluster-prod spec: image: <image built FROM icr.io/cpopen/ibm-eventautomation-flink/ibm-eventautomation-flink> flinkConfiguration: license.use: EventAutomationProduction license.license: 'L-KCVZ-JL5CRM' license.accept: 'false' high-availability.type: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: 'file:///opt/flink/volume/flink-ha' restart-strategy: failure-rate restart-strategy.failure-rate.max-failures-per-interval: '10' restart-strategy.failure-rate.failure-rate-interval: '10 min' restart-strategy.failure-rate.delay: '30 s' execution.checkpointing.interval: '5000' execution.checkpointing.unaligned: 'false' state.backend.type: rocksdb state.backend.rocksdb.thread.num: '10' state.backend.incremental: 'true' state.backend.rocksdb.use-bloom-filter: 'true' state.checkpoints.dir: 'file:///opt/flink/volume/flink-cp' state.checkpoints.num-retained: '3' state.savepoints.dir: 'file:///opt/flink/volume/flink-sp' taskmanager.numberOfTaskSlots: '2' table.exec.source.idle-timeout: '30 s' security.ssl.enabled: 'true' security.ssl.truststore: /opt/flink/tls-cert/truststore.jks security.ssl.truststore-password: <jks-password> security.ssl.keystore: /opt/flink/tls-cert/keystore.jks security.ssl.keystore-password: <jks-password> security.ssl.key-password: <jks-password> kubernetes.secrets: '<jks-secret>:/opt/flink/tls-cert' serviceAccount: flink podTemplate: apiVersion: v1 kind: Pod metadata: name: pod-template spec: affinity: podAntiAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 80 podAffinityTerm: labelSelector: matchExpressions: - key: type operator: In values: - flink-native-kubernetes topologyKey: kubernetes.io/hostname containers: - name: flink-main-container volumeMounts: - name: flink-logs mountPath: /opt/flink/log - name: flink-volume mountPath: /opt/flink/volume volumes: - name: flink-logs emptyDir: {} - name: flink-volume persistentVolumeClaim: claimName: ibm-flink-pvc jobManager: replicas: 2 resource: memory: '4096m' cpu: 0.5 taskManager: resource: memory: '4096m' cpu: 2 job: jarURI: <insert jar file name here> args: ['<insert path for statements.sql here>'] parallelism: 1 state: running upgradeMode: savepoint allowNonRestoredState: true mode: native
In Event Processing versions earlier than 1.2.3, select the Production - Flink Application cluster sample.
Note: The Flink instance must be configured with persistent storage.
If you do not want to use the examples provided earlier, add the following parameter to set a timeout period for event sources when they are marked idle. This allows downstream tasks to advance their watermark. Idleness is not detected by default. The parameter is included in all Flink samples:
spec: flinkConfiguration: table.exec.source.idle-timeout: '30 s'
For more information about
table.exec.source.idle-timeout
, see the Flink documentation.b. Append the following
spec.job
parameter, or edit the existing parameter if using the Production - Flink Application cluster sample:spec: job: jarURI: local:///opt/flink/usrlib/sql-runner.jar args: ["/opt/flink/usrlib/sql-scripts/statements.sql"] parallelism: 1 state: running upgradeMode: savepoint
c. Set the Flink image:
spec: image: <image built at step 1.e>
d. Set the Flink version (only required if the
--set webhook.create
is set tofalse
during the operator installation).-
Obtain the correct version by listing the
IBM_FLINK_VERSION
environment variable on the Flink operator pod:kubectl set env pod/<flink_operator_pod_name> --list -n <flink_operator_namespace> | grep IBM_FLINK_VERSION
-
Add the version to the
FlinkDeployment
, for example:spec: flinkVersion: "v1_19"
-
-
Apply the modified
FlinkDeployment
custom resource.
Changing the parallelism of a Flink SQL runner
-
Edit the
FlinkDeployment
custom resource.a. Ensure that the Flink cluster has enough task slots to fulfill the targeted parallelism value.
Task slots = spec.taskmanager.replicas × spec.flinkConfiguration["taskmanager.numberOfTaskSlots"]
b. Change the
spec.job.parallelism
value, then setspec.job.state
torunning
andspec.job.upgradeMode
tosavepoint
.spec: job: jarURI: local:///opt/flink/usrlib/sql-runner.jar args: ["/opt/flink/usrlib/sql-scripts/statements.sql"] parallelism: 2 state: running upgradeMode: savepoint allowNonRestoredState: true
-
Apply the modified
FlinkDeployment
custom resource.The following operations are automatically performed by Flink:
- A savepoint is created before the Flink job is suspended.
- The Flink cluster is shutdown, the
JobManager
andTaskManager
pods are terminated. - A Flink cluster is created with new
JobManager
andTaskManager
pods. - The Flink job is restarted from the savepoint.
Trigger a savepoint for a running Flink SQL job
-
Ensure that the
status
section indicates that the Job Manager is inREADY
status and that the Flink job is inRUNNING
status by checking theFlinkDeployment
custom resource.status: jobManagerDeploymentStatus: READY jobStatus: state: RUNNING
-
Set the following values in the
FlinkDeployment
custom resource:a. Set the value of
spec.job.upgradeMode
tosavepoint
.b. Set the value of
spec.job.state
torunning
.c. Set the value of
spec.job.savepointTriggerNonce
to an integer that has never been used before for that option.For example:
job: jarURI: local:///opt/flink/usrlib/sql-runner.jar args: ["/opt/flink/usrlib/sql-scripts/statements.sql"] savepointTriggerNonce: <integer value> state: running upgradeMode: savepoint
d. Save the changes in the
FlinkDeployment
custom resource.A savepoint is triggered and written to a location in the PVC, which is indicated in the
status.jobStatus.savepointInfo.lastSavepoint.location
field of theFlinkDeployment
custom resource.For example:
status: [...] jobStatus: [...] savepointInfo: [...] lastSavepoint: formatType: CANONICAL location: 'file:/opt/flink/volume/flink-sp/savepoint-e372fa-9069a1c0563e' timeStamp: 1733957991559 triggerNonce: 1 triggerType: MANUAL
-
Keep the
FlinkDeployment
custom resource and the PVC to make them available later for restoring your deployment.
Stop a Flink SQL job with a savepoint
-
Edit the
FlinkDeployment
custom resource. -
Make the following modifications:
a. Ensure that the value of
spec.job.upgradeMode
issavepoint
.b. Ensure that the value of
spec.job.state
issuspended
to stop the Flink job.spec: job: jarURI: local:///opt/flink/usrlib/sql-runner.jar args: ["/opt/flink/usrlib/sql-scripts/statements.sql"] state: suspended upgradeMode: savepoint
-
Save the changes in the
FlinkDeployment
custom resource.A savepoint is triggered and written to a location in the PVC, which is indicated in the
status.jobStatus.savepointInfo.lastSavepoint.location
field of theFlinkDeployment
custom resource.For example:
status: [...] jobStatus: [...] savepointInfo: [...] lastSavepoint: formatType: CANONICAL location: 'file:/opt/flink/volume/flink-sp/savepoint-e372fa-9069a1c0563e' timeStamp: 1733957991559 triggerNonce: 1 triggerType: UPGRADE
Resume a Flink SQL job with a savepoint
-
Edit the
FlinkDeployment
custom resource that you saved earlier when you triggered a savepoint or the custom resource of a Flink job that you suspended earlier:a. Ensure that the value of
spec.job.upgradeMode
issavepoint
.b. Ensure that the value of
spec.job.state
isrunning
to resume the Flink job.c. Remove
spec.job.savepointTriggerNonce
and its value.d. Set the value of
spec.job.initialSavepointPath
to the savepoint location found as described in step 2.d if you triggered a savepoint earlier or in step 3 if you suspended the Flink job earlier, plus the suffix/_metadata
.For example:
job: jarURI: local:///opt/flink/usrlib/sql-runner.jar args: ["/opt/flink/usrlib/sql-scripts/statements.sql"] state: running upgradeMode: savepoint initialSavepointPath: file:/opt/flink/volume/flink-sp/savepoint-e372fa-9069a1c0563e/_metadata allowNonRestoredState: true
-
Save the changes in the
FlinkDeployment
custom resource.A savepoint is triggered and written to a location in the PVC, which is indicated in the
status.jobStatus.savepointInfo.lastSavepoint.location
field of theFlinkDeployment
custom resource.For example:
status: [...] jobStatus: [...] savepointInfo: [...] lastSavepoint: formatType: CANONICAL location: 'file:/opt/flink/volume/flink-sp/savepoint-e372fa-9069a1c0563e' timeStamp: 1733957991559 triggerNonce: 1 triggerType: MANUAL
Enable SSL connection for your database
To securely connect Flink jobs to a database such as PostgreSQL, MySQL, or Oracle, enable an SSL connection with the database as follows:
-
Ensure you added the CA certificate for your database to the truststore and then created a secret with the truststore.
-
Edit the
FlinkDeployment
custom resource. -
Complete the following modifications:
-
In the
spec.flinkConfiguration
section, add:env.java.opts.taskmanager: >- -Djavax.net.ssl.trustStore=/certs/truststore.<keystore-extension> -Djavax.net.ssl.trustStorePassword=<chosen password> env.java.opts.jobmanager: >- -Djavax.net.ssl.trustStore=/certs/truststore.<keystore-extension> -Djavax.net.ssl.trustStorePassword=<chosen password>
Where:
-
<keystore-extension>
is the extension for your keystore format. For example,jks
for Java Keystore andp12
for Public-Key Cryptography Standards. -
In
spec.podTemplate.spec.containers.volumeMounts
section, add:- mountPath: /certs name: truststore readOnly: true
-
In
spec.podTemplate.spec.volumes
section, add:- name: truststore secret: items: - key: truststore.<keystore-extension> path: truststore.<keystore-extension> secretName: ssl-truststore
-
-
Apply the modified
FlinkDeployment
custom resource:kubectl apply -f <custom-resource-file-path>
For example:
kubectl apply -f flinkdeployment_demo.yaml
A secure SSL connection is enabled between Flink and the database.