Find out how to deploy your Flink jobs that are customized for production or test environments in an application mode Flink cluster.
Benefits of customized Flink jobs include:
- Compatibility when working on an automation in a continuous integration and continuous delivery (CI/CD) pipeline.
- The ability to reuse and easily customize the connector configuration across different targeted environments, such as test, staging, pre-production, and production (with the JSON and configuration YAML export option).
-
Support for the automatic upgrade of your IBM Operator for Apache Flink version.
Note: Automatic upgrade is not supported if the
FlinkDeployment
custom resource uses an extension of the IBM Flink image. In this case, the extension of the image must be rebuilt to use the upgraded IBM Flink image.
Important: This deployment cannot be used with the Event Processing UI.
Prerequisites
-
You exported your flow from the Event Processing UI in the JSON and configuration YAML format, and saved it to a file, for example,
flow.zip
. -
You downloaded the exported flow file (for example,
flow.zip
), and extracted it to have theflow.json
andconfig.yaml
files.
Preparing the config.yaml
file to match the target environment
Before deploying the Flink job, edit the config.yaml
file. The following is an example config.yaml
file exported from the Event Processing UI:
config:
source:
source___TABLE:
properties.sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule
required username="<username_redacted>" password="<password_redacted>";
connector: kafka
format: json
json.ignore-parse-errors: 'true'
properties.bootstrap.servers: kafka.01:9002
properties.isolation.level: read_committed
properties.sasl.mechanism: SCRAM-SHA-512
properties.security.protocol: SASL_SSL
properties.ssl.endpoint.identification.algorithm: ''
properties.ssl.truststore.certificates: '-----BEGIN CERTIFICATE----- [...] -----END CERTIFICATE-----'
properties.ssl.truststore.type: PEM
properties.tls.pemChainIncluded: 'false'
scan.startup.mode: earliest-offset
topic: input-topic
sink_1:
sink_1:
properties.sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule
required username="<username_redacted>" password="<password_redacted>";
connector: kafka
format: json
properties.bootstrap.servers: kafka.01:9002
properties.sasl.mechanism: SCRAM-SHA-512
properties.security.protocol: SASL_SSL
properties.ssl.endpoint.identification.algorithm: ''
properties.ssl.truststore.certificates: '-----BEGIN CERTIFICATE----- [...] -----END CERTIFICATE-----'
properties.ssl.truststore.type: PEM
properties.tls.pemChainIncluded: 'false'
topic: output-topic
The config.yaml
file contains information about the nodes, tables, and their connector properties where:
config
: Lists the names of the nodes that use Kafka, JDBC, and HTTP connectors. In the example snippet, thesource
andsink_1
nodes are listed.config.source
: Lists the tables present in the SQL generated for thesource
node. In the example snippet, there is only one table for thesource
node:source___TABLE
config.source.source___TABLE
: Lists the connector properties of thesource___TABLE
table.config.sink_1
: Lists the tables present in the SQL generated for thesink_1
node. In the example snippet, there is only one table for thesink_1
node:sink_1
(table name is same as the node name).config.sink_1.sink_1
: Lists the connector properties of thesink_1
table.
Note: Sensitive properties, which have redacted values, are listed first, in alphabetical order of the connector property name. Unredacted properties are then listed also in the alphabetical order. Their value is based on what you configured in the Event Processing UI.
You can edit the config.yaml
file as follows:
- An actual value must be provided for the properties with redacted values. Otherwise, the deployment fails.
- Do not modify node names and table names. It must match the values in the
flow.json
file. - Optional: values of unredacted properties can be modified.
- Important: The property
scan.startup.mode
has always the valueearliest-offset
in the exportedconfig.yaml
file. It can be changed for instance tolatest-offset
, as required.
- Important: The property
- Optional: connector properties can be added or removed.
- For security reasons, the values containing sensitive credentials, such as username and password, are removed when exporting the flow from the Event Processing UI, so you must restore them.
Also, the exported config.yaml
file contains connector configuration that is applicable to the environment targeted in the Event Processing UI. When deploying to a different target environment, you might need to adapt the connector properties to the target environment.
See the following table for credentials and connector properties information about supported Flink SQL connectors:
Flink SQL connector | Used by nodes | Sensitive credentials | Connector properties |
---|---|---|---|
Kafka | Source and destination | About Kafka connector Note: When configuring SCRAM authentication for the Kafka connector, ensure you use double quotes only. Do not use a backslash character ( \ ) to escape the double quotes. The valid format is: username="<username>" password="<password>" |
Kafka connector properties For more information about how events can be consumed from Kafka topics, see the Flink documentation. Note: The Kafka connector value must be kafka . |
JDBC | Database | About JDBC connector | JDBC connector properties |
HTTP | API | About HTTP connector | HTTP connector properties. Note: the HTTP connector version for Event Processing versions 1.2.3 and later is 0.16.0. For earlier Event Processing 1.2.x releases, see the 0.15.0 connector documentation. |
Important:
- The value of the
format
property of the Kafka connector, which allows to select between Avro, Avro (schema registry), and JSON, should not be changed in theconfig.yaml
. Instead, change it by editing the flow in the Event Processing UI, test the flow, then export the flow again. - The
flow.json
file must not be modified.
Use of templating
The values of the connector properties in the config.yaml
file can be turned into template variables by manually editing the config.yaml
file.
The actual values can then be provided in a CI/CD pipeline, by using any templating engine.
For example (the exact syntax might vary based on the templating engine):
properties.sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule
required username="{{ kafka.username }}" password="{{ kafka.password }}";
Deploy the Flink job
You can use a Kubernetes FlinkDeployment
custom resource in application mode to deploy a Flink job.
-
Log in to your Kubernetes cluster as a cluster administrator by setting your
kubectl
context. -
Switch to the namespace where the IBM Operator for Apache Flink is installed:
kubectl config set-context --current --namespace=<namespace>
-
Deploy a Kubernetes secret holding the
flow.json
andconfig.yaml
files:kubectl create secret generic <k8s_secret_name> \ --from-file=flow.json=./flow.json \ --from-file=config.yaml=./config.yaml
Where
<k8s_secret_name
must match the name specified for the secret in theFlinkDeployment
, for instance:volumes: - name: flow-volume secret: secretName: application-cluster-prod
A simple way for allowing the coexistence of multiple Flink instances in the same namespace is to use for the Kubernetes secret the same name as for the Flink instance. This way, the mapping between Flink instances and Kubernetes secrets is clear, and there is no clash.
-
Create the IBM Operator for Apache Flink
FlinkDeployment
custom resource.a. Choose the Production - Flink Application cluster sample, or a Flink custom resource in application mode, configured with persistent storage. If you prefer to not use a provided sample, add the following parameter to set a timeout period for event sources when they are marked idle. This allows downstream tasks to advance their watermark. Idleness is not detected by default. The parameter is included in all the provided samples.
spec: flinkConfiguration: table.exec.source.idle-timeout: '30 s'
For more information about
table.exec.source.idle-timeout
, see the Flink documentation.b. Prepare the
FlinkDeployment
custom resource as described in step 1 of installing a Flink instance. -
Apply the modified
FlinkDeployment
custom resource by using the UI or the CLI.
Trigger a savepoint for a running Flink job
-
Ensure that the
status
section indicates that the Job Manager is inREADY
status and that the Flink job is inRUNNING
status by checking theFlinkDeployment
custom resource.status: jobManagerDeploymentStatus: READY jobStatus: state: RUNNING
-
Set the following values in the
FlinkDeployment
custom resource:a. Set the value of
spec.job.upgradeMode
tosavepoint
.b. Set the value of
spec.job.state
torunning
.c. Set the value of
spec.job.savepointTriggerNonce
to an integer that has never been used before for that option.For example:
job: jarURI: local:///opt/flink/ibm-flow/ibm-ep-flow-deployer.jar args: [] savepointTriggerNonce: <integer value> state: running upgradeMode: savepoint
d. Save the changes in the
FlinkDeployment
custom resource.A savepoint is triggered and written to a location in the PVC, which is indicated in the
status.jobStatus.savepointInfo.lastSavepoint.location
field of theFlinkDeployment
custom resource.For example:
status: [...] jobStatus: [...] savepointInfo: [...] lastSavepoint: formatType: CANONICAL location: 'file:/opt/flink/volume/flink-sp/savepoint-e372fa-9069a1c0563e' timeStamp: 1733957991559 triggerNonce: 1 triggerType: MANUAL
-
Keep the
FlinkDeployment
custom resource and the PVC to make them available later for restoring your deployment.
Stop a Flink job with a savepoint
-
Edit the
FlinkDeployment
custom resource. -
Make the following modifications:
a. Ensure that the value of
spec.job.upgradeMode
issavepoint
.b. Ensure that the value of
spec.job.state
issuspended
to stop the Flink job.spec: job: jarURI: local:///opt/flink/ibm-flow/ibm-ep-flow-deployer.jar args: [] state: suspended upgradeMode: savepoint
-
Save the changes in the
FlinkDeployment
custom resource.A savepoint is triggered and written to a location in the PVC, which is indicated in the
status.jobStatus.savepointInfo.lastSavepoint.location
field of theFlinkDeployment
custom resource.For example:
status: [...] jobStatus: [...] savepointInfo: [...] lastSavepoint: formatType: CANONICAL location: 'file:/opt/flink/volume/flink-sp/savepoint-e372fa-9069a1c0563e' timeStamp: 1733957991559 triggerNonce: 1 triggerType: UPGRADE
Resume a Flink job with a savepoint
-
Edit the
FlinkDeployment
custom resource that you saved earlier when you triggered a savepoint or the custom resource of a Flink job that you suspended earlier:a. Ensure that the value of
spec.job.upgradeMode
issavepoint
.b. Ensure that the value of
spec.job.state
isrunning
to resume the Flink job.c. Remove
spec.job.savepointTriggerNonce
and its value.d. Set the value of
spec.job.initialSavepointPath
to the savepoint location found as described in step 2.d during savepoint triggering or in step 3 if you suspended the job, plus the suffix/_metadata
.For example:
job: jarURI: local:///opt/flink/ibm-flow/ibm-ep-flow-deployer.jar args: [] state: running upgradeMode: savepoint initialSavepointPath: file:/opt/flink/volume/flink-sp/savepoint-e372fa-9069a1c0563e/_metadata allowNonRestoredState: true
-
Apply the modified
FlinkDeployment
custom resource.
Enable SSL connection for your database
To securely connect Flink jobs to a database such as PostgreSQL, MySQL, or Oracle, enable an SSL connection with the database as follows:
-
Ensure you added the CA certificate for your database to the truststore and then created a secret with the truststore.
-
Edit the
FlinkDeployment
custom resource. -
Complete the following modifications:
-
In the
spec.flinkConfiguration
section, add:env.java.opts.taskmanager: >- -Djavax.net.ssl.trustStore=/certs/truststore.<keystore-extension> -Djavax.net.ssl.trustStorePassword=<chosen password> env.java.opts.jobmanager: >- -Djavax.net.ssl.trustStore=/certs/truststore.<keystore-extension> -Djavax.net.ssl.trustStorePassword=<chosen password>
Where:
-
<keystore-extension>
is the extension for your keystore format. For example,jks
for Java Keystore andp12
for Public-Key Cryptography Standards. -
In
spec.podTemplate.spec.containers.volumeMounts
section, add:- mountPath: /certs name: truststore readOnly: true
-
In
spec.podTemplate.spec.volumes
section, add:- name: truststore secret: items: - key: truststore.<keystore-extension> path: truststore.<keystore-extension> secretName: ssl-truststore
-
-
Apply the modified
FlinkDeployment
custom resource:kubectl apply -f <custom-resource-file-path>
For example:
kubectl apply -f flinkdeployment_demo.yaml
A secure SSL connection is enabled between Flink and the database.
Troubleshooting
The successful execution of the Flink job when deploying depends on the correctness of the actual connector configuration.
For troubleshooting:
- Check the logs of the active Flink Job Manager (for errors due to invalid
config.yaml
file, search forcom.ibm.ei.streamproc.model.deploy.imports.ConfigModelException
). - Errors due to incorrect configuration can also lead to Flink exceptions, which are logged in the Flink task manager pods.