Find out how to deploy your advanced flows in a Flink cluster as part of your production environment.
Important: This deployment cannot be used with Event Processing UI.
Note: The Apache operator sample that is referenced in the following sections points to the version of the sample in the main
branch, which is up to date, and might include fixes that are absent in the release-1.5
and release-1.6
branches.
Prerequisites
-
The SQL statements are exported from the Event Processing UI and saved to a file, for example,
statements.sql
.For more information, see Exporting flows.
-
You adequately updated the Flink SQL Kafka connectors properties and values defined in file
statements.sql
to match your target environment:-
Sensitive credentials.
For security reasons, the values containing sensitive credentials are removed from the Event Processing UI when exporting the SQL statements, so you must restore them.
For more information about Flink SQL Kafka connectors, see the Flink documentation.
Note: When configuring SCRAM authentication for the Kafka connector, ensure you use double quotes only. Do not use a backlash character (
\
) to escape the double quotes. The valid format is:username="<username>" password="<password>"
-
Connector properties values.
For more information about how events can be consumed from Kafka topics, see the Flink documentation.
Note: The Kafka connector value must be
kafka
. -
To deploy a running Flink job, the SQL statements in file
statements.sql
must contain one of the following:- A definition of a Flink SQL Kafka sink (also known as event destination), and an
INSERT INTO
clause that selects the columns of the last temporary view into this sink. - A
SELECT
clause that takes one or all of the columns of the last temporary view.
For more information about how to define a Flink SQL sink, see the Flink documentation.
- A definition of a Flink SQL Kafka sink (also known as event destination), and an
-
Setup a connection to the Flink cluster
-
Log in to your Kubernetes cluster as a cluster administrator by setting your
kubectl
context. -
Switch to the namespace where the IBM Operator for Apache Flink is installed:
kubectl config set-context --current --namespace=<namespace>
Build and deploy a Flink SQL runner
You can use a Kubernetes FlinkDeployment
custom resource in application mode to deploy a Flink job for processing and deploying the statements in the file statements.sql
.
A sample application flink-sql-runner-example is provided in the Apache Flink GitHub repository for that purpose.
Follow the instructions to build:
- the flink-sql-runner-example JAR file (Flink job)
- the Docker image
Important: Ensure that the Flink SQL runner JAR file and the statements.sql
file have read permissions (644) for non-root users. If the JAR file is only readable by the root user, the FlinkDeployment
instance cannot be started by non-root users.
Some adaptations to this procedure are required to build the Docker image and use the file statements.sql
:
-
Modify the Dockerfile to use the IBM Flink image:
a. Execute the following command to extract the Flink image name including its SHA digest from the
ClusterServiceVersion
(CSV). For example, if you are running on Flink version 1.0.4:kubectl get csv -o jsonpath='{.spec.install.spec.deployments[*].spec.template.spec.containers[0].env[?(@.name=="IBM_FLINK_IMAGE")].value}' ibm-eventautomation-flink.v1.0.4
b. Edit the Dockerfile and change the
FROM
clause to IBM Flink image with its SHA digest, as determined in the previous step.FROM <IBM Flink image with digest>
c. Remove the sample SQL statement files from the directory sql-scripts.
d. Copy the file
statements.sql
to the directory sql-scripts.e. Build the docker image and push it to a registry accessible from your OpenShift Container Platform. If your registry requires authentication, configure the image pull secret, for example, by using the global cluster pull secret.
-
Create the IBM Operator for Apache Flink
FlinkDeployment
custom resource.a. Choose the Production - Flink Application cluster sample, or a production sample with persistent storage. If you prefer to not use a provided sample, add the following parameter to set a timeout period for event sources when they are marked idle. This allows downstream tasks to advance their watermark. Idleness is not detected by default. The parameter is included in all the provided samples.
spec: flinkConfiguration: table.exec.source.idle-timeout: '30 s'
For more information about
table.exec.source.idle-timeout
, see the Flink documentation.b. Append the following
spec.job
parameter, or edit the existing parameter if using the Production - Flink Application cluster sample:spec: job: jarURI: local:///opt/flink/usrlib/sql-runner.jar args: ["/opt/flink/usrlib/sql-scripts/statements.sql"] parallelism: 1 state: running upgradeMode: savepoint
c. Set the Flink image:
spec: image: <image built at step 1.e>
-
Deploy this
FlinkDeployment
custom resource.
Changing the parallelism of a Flink SQL runner
-
Edit the
FlinkDeployment
custom resource.a. Ensure that the Flink cluster has enough task slots to fulfill the targeted parallelism value.
Task slots = spec.taskmanager.replicas × spec.flinkConfiguration["taskmanager.numberOfTaskSlots"]
b. Change the
spec.job.parallelism
value, then setspec.job.state
torunning
andspec.job.upgradeMode
tosavepoint
.spec: job: jarURI: local:///opt/flink/usrlib/sql-runner.jar args: ["/opt/flink/usrlib/sql-scripts/statements.sql"] parallelism: 2 state: running upgradeMode: savepoint allowNonRestoredState: true
-
Apply the modified
FlinkDeployment
custom resource.The following operations are automatically performed by Flink:
- A savepoint is created before the Flink job is suspended.
- The Flink cluster is shutdown, the
JobManager
andTaskManager
pods are terminated. - A Flink cluster is created with new
JobManager
andTaskManager
pods. - The Flink job is restarted from the savepoint.
Trigger a savepoint for a running Flink SQL job
-
Edit the
FlinkDeployment
custom resource. -
Make the following modifications
a. Ensure that the value of
spec.job.upgradeMode
issavepoint
.b. Ensure that the value of
spec.job.state
isrunning
.c. Ensure that the value of
spec.job.savepointTriggerNonce
is an integer that has never been used before for that option.spec: job: jarURI: local:///opt/flink/usrlib/sql-runner.jar args: ["/opt/flink/usrlib/sql-scripts/statements.sql"] savepointTriggerNonce: <integer value> state: running upgradeMode: savepoint
-
Apply the modified
FlinkDeployment
custom resource.A new savepoint is created in the directory specified in
spec.flinkConfiguration["state.savepoints.dir"]
.
Stop a Flink SQL with a savepoint
-
Edit the
FlinkDeployment
custom resource. -
Make the following modifications
a. Ensure that the value of
spec.job.upgradeMode
issavepoint
.b. Ensure that the value of
spec.job.state
issuspended
to stop the Flink job.spec: job: jarURI: local:///opt/flink/usrlib/sql-runner.jar args: ["/opt/flink/usrlib/sql-scripts/statements.sql"] state: suspended upgradeMode: savepoint
-
Apply the modified
FlinkDeployment
custom resource.A new savepoint is created in the directory specified in
spec.flinkConfiguration["state.savepoints.dir"]
.
Resume a Flink SQL with a savepoint
-
Edit the
FlinkDeployment
custom resource. -
Make the following modifications:
a. Ensure that the value of
spec.job.upgradeMode
issavepoint
.b. Ensure that the value of
spec.job.state
isrunning
to resume the Flink job.c. Ensure that the same directory is set for the parameters
spec.job.initialSavepointPath
andspec.flinkConfiguration["state.savepoints.dir"]
.spec: job: jarURI: local:///opt/flink/usrlib/sql-runner.jar args: ["/opt/flink/usrlib/sql-scripts/statements.sql"] state: running upgradeMode: savepoint initialSavepointPath: <savepoint directory> allowNonRestoredState: true
-
Apply the modified
FlinkDeployment
custom resource.The Flink job is automatically resumed from the latest savepoint that Flink finds in
spec.job.initialSavepointPath
.
Enable SSL connection for your database
To securely connect Flink jobs to a database such as PostgreSQL, enable an SSL connection with the database as follows:
-
Ensure you added the CA certificate for your database to the truststore and then created a secret with the truststore.
-
Edit the
FlinkDeployment
custom resource. -
Complete the following modifications:
-
In
spec.flinkConfiguration
section, add:env.java.opts.taskmanager: >- -Djavax.net.ssl.trustStore=/certs/truststore.<keystore-extension> -Djavax.net.ssl.trustStorePassword=<chosen password> env.java.opts.jobmanager: >- -Djavax.net.ssl.trustStore=/certs/truststore.<keystore-extension> -Djavax.net.ssl.trustStorePassword=<chosen password>
Where:
-
<keystore-extension>
is the extension for your keystore format. For example,jks
for Java Keystore andp12
for Public-Key Cryptography Standards. -
In
spec.podTemplate.spec.containers.volumeMounts
section, add:- mountPath: /certs name: truststore readOnly: true
-
In
spec.podTemplate.spec.volumes
section, add:- name: truststore secret: items: - key: truststore.<keystore-extension> path: truststore.<keystore-extension> secretName: ssl-truststore
-
-
Apply the modified
FlinkDeployment
custom resource:kubectl apply -f <custom-resource-file-path>
For example:
kubectl apply -f flinkdeployment_demo.yaml
An SSL connection is enabled between Flink and a secured database.