Find out how to deploy your flows in an application mode Flink cluster as part of your production environment.
Important:
- You can use the JSON and configuration YAML flow export format for deploying jobs customized for production or test environments. In most cases, this provides a better user-experience, and can be used with an automation in a continuous integration and continuous delivery (CI/CD) pipeline.
- This deployment cannot be used with the Event Processing UI.
Note: The Apache operator sample that is referenced in the following sections points to the version of the sample in the main
branch, which is up-to-date, and might include fixes that are absent in the release branches.
Prerequisites
-
The SQL statements are exported from the Event Processing UI and saved to a file, for example,
statements.sql
.For more information, see exporting flows.
-
You updated the Flink SQL connector properties and values that are defined in the file
statements.sql
to match your target environment.For security reasons, the values containing sensitive credentials, such as username and password, are removed when exporting the SQL statements from the Event Processing UI, so you must restore them.
Also, the exported SQL contains connector configuration that is applicable to the environment targeted in the Event Processing UI. When deploying to a different target environment, you might need to adapt the connector properties to the target environment.
See the following table for credentials and connector properties information about supported Flink SQL connectors:
Flink SQL connector Used by nodes Sensitive credentials Connector properties Kafka Source and destination About Kafka connector
Note: When configuring SCRAM authentication for the Kafka connector, ensure you use double quotes only. Do not use a backslash character (\
) to escape the double quotes. The valid format is:username="<username>" password="<password>"
Kafka connector properties
For more information about how events can be consumed from Kafka topics, see the Flink documentation.
Note: The Kafka connector value must bekafka
. The Kafka connector version for Event Processing versions 1.3.0 and later is 3.4.0-1.20.JDBC Database About JDBC connector JDBC connector properties HTTP API About HTTP connector HTTP connector properties.
Note: The HTTP connector version for Event Processing versions 1.3.0 and later is 0.18.0. -
To deploy a running Flink job, the SQL statements in the file
statements.sql
must contain one of the following clauses:- A definition of a Flink SQL Kafka sink (also known as event destination), and an
INSERT INTO
clause that selects the columns of the last temporary view into this sink. - A
SELECT
clause that takes one or all of the columns of the last temporary view.
For more information about how to define a Flink SQL sink, see the Flink documentation.
- A definition of a Flink SQL Kafka sink (also known as event destination), and an
Use Flink user-defined functions
Optionally, user-defined functions (UDFs) can be used as a complement of the built-in functions, by editing the SQL exported from the Event Processing UI.
For more information, see UDFs in the exported SQL.
Setup a connection to the Flink cluster
-
Log in to your Kubernetes cluster as a cluster administrator by setting your
kubectl
context. -
Switch to the namespace where the IBM Operator for Apache Flink is installed:
kubectl config set-context --current --namespace=<namespace>
Build and deploy a Flink SQL runner
You can use a Kubernetes FlinkDeployment
custom resource in application mode to deploy a Flink job for processing and deploying the statements in the file statements.sql
.
A sample application flink-sql-runner-example is provided in the Apache Flink GitHub repository for that purpose.
Follow the instructions to build:
- the flink-sql-runner-example JAR file (Flink job)
- the Docker image
Important: Ensure that the Flink SQL runner JAR file and the statements.sql
file have read permissions (644) for non-root users. If the JAR file is only readable by the root user, the FlinkDeployment
instance cannot be started by non-root users.
Some adaptations to this procedure are required to build the Docker image and use the file statements.sql
:
-
Modify the Dockerfile to use the IBM Flink image:
a. Execute the following command to extract the Flink image name including its SHA digest from the
ClusterServiceVersion
(CSV). For example, if you are running on Flink version 1.3.0:kubectl get csv -o jsonpath='{.spec.install.spec.deployments[*].spec.template.spec.containers[0].env[?(@.name=="IBM_FLINK_IMAGE")].value}' ibm-eventautomation-flink.v1.3.0
Alternatively, you can obtain the image name from the Flink operator pod’s environment variable:
kubectl set env pod/<flink_operator_pod_name> --list -n <flink_operator_namespace> | grep IBM_FLINK_IMAGE
b. Edit the Dockerfile and change the
FROM
clause to use the IBM Flink image with its SHA digest, as determined in the previous step.FROM --platform=<platform> <IBM Flink image with digest>
Where
<platform>
islinux/amd64
orlinux/s390x
, depending on your deployment target.c. If at the previous optional step you introduced the use of Flink user-defined functions (UDFs), edit the Dockerfile to copy the JAR file that contains the UDF classes:
COPY --chown=flink:root <path-of-the-udf-jar> /opt/flink/lib
For example:
COPY --chown=flink:root /udfproject/target/udf.jar /opt/flink/lib
d. Remove the sample SQL statement files from the sql-scripts directory.
e. Copy the
statements.sql
file to the directory sql-scripts.f. Build the docker image and push it to a registry accessible from your OpenShift Container Platform. If your registry requires authentication, configure the image pull secret, for example, by using the global cluster pull secret.
-
Create the IBM Operator for Apache Flink
FlinkDeployment
custom resource.You can use a Kubernetes
FlinkDeployment
custom resource in application mode to deploy a Flink job for processing and deploying the statements in thestatements.sql
file.a. You can start with the following example of an application mode Flink instance:
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: application-cluster-prod spec: image: <image built FROM icr.io/cpopen/ibm-eventautomation-flink/ibm-eventautomation-flink> flinkConfiguration: license.use: EventAutomationProduction license.license: 'L-KCVZ-JL5CRM' license.accept: 'false' high-availability.type: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: 'file:///opt/flink/volume/flink-ha' restart-strategy: failure-rate restart-strategy.failure-rate.max-failures-per-interval: '10' restart-strategy.failure-rate.failure-rate-interval: '10 min' restart-strategy.failure-rate.delay: '30 s' execution.checkpointing.interval: '5000' execution.checkpointing.unaligned: 'false' state.backend.type: rocksdb state.backend.rocksdb.thread.num: '10' state.backend.incremental: 'true' state.backend.rocksdb.use-bloom-filter: 'true' state.checkpoints.dir: 'file:///opt/flink/volume/flink-cp' state.checkpoints.num-retained: '3' state.savepoints.dir: 'file:///opt/flink/volume/flink-sp' taskmanager.numberOfTaskSlots: '2' table.exec.source.idle-timeout: '30 s' security.ssl.enabled: 'true' security.ssl.truststore: /opt/flink/tls-cert/truststore.jks security.ssl.truststore-password: <jks-password> security.ssl.keystore: /opt/flink/tls-cert/keystore.jks security.ssl.keystore-password: <jks-password> security.ssl.key-password: <jks-password> kubernetes.secrets: '<jks-secret>:/opt/flink/tls-cert' serviceAccount: flink podTemplate: apiVersion: v1 kind: Pod metadata: name: pod-template spec: affinity: podAntiAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 80 podAffinityTerm: labelSelector: matchExpressions: - key: type operator: In values: - flink-native-kubernetes topologyKey: kubernetes.io/hostname containers: - name: flink-main-container volumeMounts: - name: flink-logs mountPath: /opt/flink/log - name: flink-volume mountPath: /opt/flink/volume volumes: - name: flink-logs emptyDir: {} - name: flink-volume persistentVolumeClaim: claimName: ibm-flink-pvc jobManager: replicas: 2 resource: memory: '4096m' cpu: 0.5 taskManager: resource: memory: '4096m' cpu: 2 job: jarURI: <insert jar file name here> args: ['<insert path for statements.sql here>'] parallelism: 1 state: running upgradeMode: savepoint allowNonRestoredState: true mode: native
Note: The Flink instance must be configured with persistent storage.
If you do not want to use the examples provided earlier, add the following parameter to set a timeout period for event sources when they are marked idle. This allows downstream tasks to advance their watermark. Idleness is not detected by default. The parameter is included in all Flink samples:
spec: flinkConfiguration: table.exec.source.idle-timeout: '30 s'
For more information about
table.exec.source.idle-timeout
, see the Flink documentation.b. Append the following
spec.job
parameter, or edit the existing parameter if using the Production - Flink Application cluster sample:spec: job: jarURI: local:///opt/flink/usrlib/sql-runner.jar args: ["/opt/flink/usrlib/sql-scripts/statements.sql"] parallelism: 1 state: running upgradeMode: savepoint
c. Set the Flink image:
spec: image: <image built at step 1.e>
d. Set the Flink version (only required if the
--set webhook.create
is set tofalse
during the operator installation).-
Obtain the correct version by listing the
IBM_FLINK_VERSION
environment variable on the Flink operator pod:kubectl set env pod/<flink_operator_pod_name> --list -n <flink_operator_namespace> | grep IBM_FLINK_VERSION
-
Add the version to the
FlinkDeployment
, for example:spec: flinkVersion: "v1_19"
-
-
Apply the modified
FlinkDeployment
custom resource.
Changing the parallelism of a Flink SQL runner
-
Edit the
FlinkDeployment
custom resource.a. Ensure that the Flink cluster has enough task slots to fulfill the targeted parallelism value.
Task slots = spec.taskmanager.replicas × spec.flinkConfiguration["taskmanager.numberOfTaskSlots"]
b. Change the
spec.job.parallelism
value, then setspec.job.state
torunning
andspec.job.upgradeMode
tosavepoint
.spec: job: jarURI: local:///opt/flink/usrlib/sql-runner.jar args: ["/opt/flink/usrlib/sql-scripts/statements.sql"] parallelism: 2 state: running upgradeMode: savepoint allowNonRestoredState: true
-
Apply the modified
FlinkDeployment
custom resource.The following operations are automatically performed by Flink:
- The
FlinkStateSnapshots
custom resource is triggered by the operator and a savepoint is created before the Flink job is suspended. - The Flink cluster is shutdown, the
JobManager
andTaskManager
pods are terminated. - A Flink cluster is created with new
JobManager
andTaskManager
pods. - The Flink job is restarted from the savepoint.
- The
Stop a Flink SQL job with a savepoint
You can temporarily stop a running Flink job while capturing its current state by creating a savepoint, and allowing you to restart the job from the exact point where it stopped by using the savepoint when required.
-
Edit the
FlinkDeployment
custom resource. -
Make the following modifications:
a. Set that the value of
spec.job.upgradeMode
tosavepoint
.b. Set that the value of
spec.job.state
tosuspended
to stop the Flink job.spec: job: jarURI: local:///opt/flink/usrlib/sql-runner.jar args: ["/opt/flink/usrlib/sql-scripts/statements.sql"] state: suspended upgradeMode: savepoint
-
Save the changes in the
FlinkDeployment
custom resource. -
The
FlinkStateSnapshots
custom resource is created by the operator with a name that starts with the name of yourFlinkDeployment
custom resource, and a savepoint is triggered and written to a location in the PVC, which is indicated in thestatus.path
ofFlinkStateSnapshots
custom resource.For example:
status: failures: 0 path: 'file:/opt/flink/volume/flink-sp/savepoint-caf2b2-39d09a1c170c' state: COMPLETED
Resume a suspended Flink job
You can resume a suspended job from the exact point where it stopped by using the savepoint created during its suspension.
-
Edit the
FlinkDeployment
custom resource of a Flink job that you suspended earlier:a. Set that the value of
spec.job.upgradeMode
issavepoint
.b. Set that the value of
spec.job.state
isrunning
to resume the Flink job.c. Set the value of
spec.job.initialSavepointPath
to the savepoint location found in thestatus.path
field of theFlinkStateSnapshots
custom resource from step 4 of stopping a Flink job with a savepoint.For example:
job: jarURI: local:///opt/flink/usrlib/sql-runner.jar args: ["/opt/flink/usrlib/sql-scripts/statements.sql"] state: running upgradeMode: savepoint initialSavepointPath: file:/opt/flink/volume/flink-sp/savepoint-caf2b2-39d09a1c170c allowNonRestoredState: true
-
Save the changes in the
FlinkDeployment
custom resource.
For more information on manually restoring a job, see manual recovery.
Enable SSL connection for your database
To securely connect Flink jobs to a database such as PostgreSQL, MySQL, or Oracle, enable an SSL connection with the database as follows:
-
Ensure you added the CA certificate for your database to the truststore and then created a secret with the truststore.
-
Edit the
FlinkDeployment
custom resource. -
Complete the following modifications:
-
In the
spec.flinkConfiguration
section, add:env.java.opts.taskmanager: >- -Djavax.net.ssl.trustStore=/certs/truststore.<keystore-extension> -Djavax.net.ssl.trustStorePassword=<chosen password> env.java.opts.jobmanager: >- -Djavax.net.ssl.trustStore=/certs/truststore.<keystore-extension> -Djavax.net.ssl.trustStorePassword=<chosen password>
Where:
-
<keystore-extension>
is the extension for your keystore format. For example,jks
for Java Keystore andp12
for Public-Key Cryptography Standards. -
In
spec.podTemplate.spec.containers.volumeMounts
section, add:- mountPath: /certs name: truststore readOnly: true
-
In
spec.podTemplate.spec.volumes
section, add:- name: truststore secret: items: - key: truststore.<keystore-extension> path: truststore.<keystore-extension> secretName: ssl-truststore
-
-
Apply the modified
FlinkDeployment
custom resource:kubectl apply -f <custom-resource-file-path>
For example:
kubectl apply -f flinkdeployment_demo.yaml
A secure SSL connection is enabled between Flink and the database.