Find out how to deploy your advanced flows in a Flink cluster for development and testing purposes.
Prerequisites
-
Ensure you have configured persistent storage before you trigger a savepoint.
-
Ensure that you have installed a session cluster instance of Flink by using a
FlinkDeployment
custom resource. This session cluster must be different from the one used by Event Processing.For more information, see installing a Flink instance and Flink sample deployments.
Note: When deploying Flink for non-production environments (such as development or testing purposes), set
license.use
toEventAutomationNonProduction
in theFlinkDeployment
custom resource:spec: flinkConfiguration: license.use: EventAutomationNonProduction license.license: L-KCVZ-JL5CRM license.accept: 'true'
-
The SQL statements are exported from the Event Processing UI and saved to a file, for example,
statements.sql
.For more information, see exporting flows.
-
You updated the Flink SQL connector properties and values that are defined in the file
statements.sql
to match your target environment.For security reasons, the values containing sensitive credentials, such as username and password, are removed when exporting the SQL statements from the Event Processing UI, so you must restore them.
Also, the exported SQL contains connector configuration that is applicable to the environment targeted in the Event Processing UI. When deploying to a different target environment, you might need to adapt the connector properties to the target environment.
See the following table for credentials and connector properties information about supported Flink SQL connectors:
Flink SQL connector Used by nodes Sensitive credentials Connector properties values Kafka Source and destination About Kafka connector
Note: When configuring SCRAM authentication for the Kafka connector, ensure you use double quotes only. Do not use a backslash character (\
) to escape the double quotes. The valid format is:username="<username>" password="<password>"
Kafka connector options
For more information about how events can be consumed from Kafka topics, see the Flink documentation.
Note: The Kafka connector value must bekafka
.JDBC Database About JDBC connector JDBC connector options HTTP API About HTTP connector HTTP connector options -
To deploy a running Flink job, the SQL statements in the file
statements.sql
must contain one of the following clauses:- A definition of a Flink SQL Kafka sink (also known as event destination), and an
INSERT INTO
clause that selects the columns of the last temporary view into this sink. - A
SELECT
clause that takes one or all of the columns of the last temporary view.
For more information about how to define a Flink SQL sink, see the Flink documentation.
- A definition of a Flink SQL Kafka sink (also known as event destination), and an
Set deployment options
You can specify deployment options in the file statements.sql
.
Each statement is optional and can be added at the top of the file with the following syntax.
SET 'key' = 'value';
-
Change the parallelism of the Flink job. The default value is 1.
For example:
SET 'parallelism.default' = '2';
-
Give a meaningful name to the Flink job.
For example:
SET 'pipeline.name' = 'meaningful-name';
-
Specify a minimum time interval for how long idle Flink job states will be retained. No cleanup is enabled by default.
For example:
SET 'table.exec.state.ttl' = '20';
For more information about
table.exec.state.ttl
, see the Flink documentation. -
Specify a timeout period for event sources when they are marked idle. This allows downstream tasks to advance their watermark. Idleness is not detected by default.
For example:
SET 'table.exec.source.idle-timeout' = '15 s';
For more information about
table.exec.source.idle-timeout
, see the Flink documentation. -
Use an existing savepoint for the Flink job submission.
For example:
SET 'execution.savepoint.path' = '/opt/flink/volume/flink-sp/savepoint-cca7bc-bb1e257f0dab';
-
Allow to skip a savepoint state that cannot be restored. Set this option to
true
if the SQL statements have changed since the savepoint was triggered and are no longer compatible with it. The default is false.For example:
SET 'execution.savepoint.ignore-unclaimed-state' = 'true';
Use Flink user-defined functions
Optionally, user-defined functions (UDFs) can be used as a complement of the built-in functions, by editing the SQL exported from the Event Processing UI.
For more information, see UDFs in the exported SQL.
Setup a connection to the Flink cluster
-
Log in to your Kubernetes cluster as a cluster administrator by setting your
kubectl
context. -
Switch to the namespace where the IBM Operator for Apache Flink is installed.
kubectl config set-context --current --namespace=<namespace>
-
Get the name of the Flink JobManager pod to connect to.
a. List the available
FlinkDeployment
custom resources.kubectl get flinkdeployment
For example:
kubectl get flinkdeployment NAME JOB STATUS LIFECYCLE STATE my-flink-deployment RUNNING STABLE
b. Retrieve the name of the first online Flink
JobManager
pod.export FLINK_JOB_MANAGER=$(kubectl get pods --selector component=jobmanager,app=<flink-deployment-name> --no-headers=true -o custom-columns=Name:.metadata.name | head -n 1)
For example:
export FLINK_JOB_MANAGER=$(kubectl get pods --selector component=jobmanager,app=my-flink-deployment --no-headers=true -o custom-columns=Name:.metadata.name | head -n 1) echo ${FLINK_JOB_MANAGER} my-flink-deployment-b5d95dc77-nmgnj
Submit a Flink SQL job
-
Setup the connection to the Flink cluster.
-
Copy the file
statements.sql
to the target containerflink-main-container
of the FlinkJobManager
pod:kubectl cp -c flink-main-container statements.sql ${FLINK_JOB_MANAGER}:/tmp
-
If at the previous optional step you introduced the use of Flink user-defined functions (UDFs), copy the JAR file that contains the UDF classes:
kubectl cp -c flink-main-container <path-of-the-udf-jar> ${FLINK_JOB_MANAGER}:/opt/flink/lib
For example:
kubectl cp -c flink-main-container /udfproject/target/udf.jar ${FLINK_JOB_MANAGER}:/opt/flink/lib
-
Submit the Flink SQL job to the Flink cluster:
kubectl exec ${FLINK_JOB_MANAGER} -- /opt/flink/bin/sql-client.sh -hist /dev/null -f /tmp/statements.sql
List the deployed Flink SQL jobs
-
Setup the connection to the Flink cluster.
-
List the Flink jobs:
kubectl exec -it ${FLINK_JOB_MANAGER} -- /opt/flink/bin/sql-client.sh -hist /dev/null <<< 'SHOW JOBS;'
Output example
+----------------------------------+-----------------+---------+-------------------------+ | job id | job name | status | start time | +----------------------------------+-----------------+---------+-------------------------+ | 89112b3a999e37740e2c73b6521d0778 | meaningful-name | RUNNING | 2023-05-11T13:45:59.451 | +----------------------------------+-----------------+---------+-------------------------+
Note: The output displays
Empty set
when no Flink job is deployed.
Trigger a savepoint for a running Flink SQL job
-
After meeting the required prerequisites, list the deployed Flink SQL jobs.
-
Locate the entry corresponding to the
job name
. -
Check that the status of this job is
RUNNING
and take note of the correspondingjob id
. -
Execute the following command that triggers the generation of a savepoint without stopping the job:
kubectl exec -it ${FLINK_JOB_MANAGER} -- /opt/flink/bin/flink savepoint --type canonical <job id>
For example:
kubectl exec -it ${FLINK_JOB_MANAGER} -- /opt/flink/bin/flink savepoint --type canonical 89112b3a999e37740e2c73b6521d0778 Triggering savepoint for job 89112b3a999e37740e2c73b6521d0778. Waiting for response... Savepoint completed. Path: file:/flink-data/savepoints/savepoint-89112b-8dbd328bf7c9
Take note of the savepoint path, you need it to restart the Flink job from this savepoint.
For information about how to restart a Flink job from a savepoint, see set deployment options.
Stop a Flink SQL job with a savepoint
-
After meeting the required prerequisites, list the deployed Flink SQL jobs.
-
Locate the entry corresponding to the
job name
. -
Check that the status of this job is
RUNNING
and take note of the correspondingjob id
. -
Execute the following command that triggers the generation of a savepoint and stops the Flink job:
kubectl exec -it ${FLINK_JOB_MANAGER} -- /opt/flink/bin/flink stop --type canonical <job id>
For example:
kubectl exec -it ${FLINK_JOB_MANAGER} -- /opt/flink/bin/flink stop --type canonical 89112b3a999e37740e2c73b6521d0778 Suspending job 89112b3a999e37740e2c73b6521d0778. Savepoint completed. Path: file:/flink-data/savepoints/savepoint-89112b-8dbd328bf7c9
Take note of the savepoint path, you need it to restart the Flink job from this savepoint.
For information about how to restart a Flink job from a savepoint, see set deployment options.