Find out how to deploy your advanced flows in a Flink cluster for development and testing purposes.
Prerequisites
-
Ensure you have configured persistent storage before you trigger a savepoint.
-
Ensure that you have installed a session cluster instance of Flink by using a
FlinkDeployment
custom resource.For more information, see installing a Flink instance and Flink sample deployments.
Note: When deploying Flink for non-production environments (such as development or testing purposes), set
license.use
toEventAutomationNonProduction
in theFlinkDeployment
custom resource:spec: flinkConfiguration: license.use: EventAutomationNonProduction license.license: L-HRZF-DWHH7A license.accept: 'true'
-
The SQL statements are exported from the Event Processing UI and saved to a file, for example,
statements.sql
.For more information, see Exporting flows.
-
You updated the Flink SQL Kafka connectors properties and values defined in file
statements.sql
to match your target environment:-
Sensitive credentials.
For security reasons, the values containing sensitive credentials are removed from the Event Processing UI when exporting the SQL statements, so you must restore them.
For more information about Flink SQL Kafka connectors, see the Flink documentation.
Note: When configuring SCRAM authentication for the Kafka connector, ensure you use double quotes only. Do not use a backlash character (
\
) to escape the double quotes. The valid format is:username="<username>" password="<password>"
-
Connector properties values.
For more information about how events can be consumed from Kafka topics, see the Flink Documentation.
Note: The Kafka connector value must be
kafka
. -
To deploy a running Flink job, the SQL statements in file
statements.sql
must contain one of the following:- A definition of a Flink SQL Kafka sink (also known as event destination), and an
INSERT INTO
clause that selects the columns of the last temporary view into this sink. - A
SELECT
clause that takes one or all of the columns of the last temporary view.
For more information about how to define a Flink SQL sink, see the Flink documentation.
- A definition of a Flink SQL Kafka sink (also known as event destination), and an
-
Set deployment options
You can specify deployment options in the file statements.sql
.
Each statement is optional and can be added at the top of the file with the following syntax.
SET 'key' = 'value';
-
Change the parallelism of the Flink job. The default value is 1.
For example:
SET 'parallelism.default' = '2';
-
Give a meaningful name to the Flink job.
For example:
SET 'pipeline.name' = 'meaningful-name';
-
Specify a minimum time interval for how long idle Flink job states will be retained. No cleanup is enabled by default.
For example:
SET 'table.exec.state.ttl' = '20';
For more information about
table.exec.state.ttl
, see the Flink documentation. -
Specify a timeout period for event sources when they are marked idle. This allows downstream tasks to advance their watermark. Idleness is not detected by default.
For example:
SET 'table.exec.source.idle-timeout' = '15 s';
For more information about
table.exec.source.idle-timeout
, see the Flink documentation. -
Use an existing savepoint for the Flink job submission.
For example:
SET 'execution.savepoint.path' = '/opt/flink/volume/flink-sp/savepoint-cca7bc-bb1e257f0dab';
-
Allow to skip a savepoint state that cannot be restored. Set this option to
true
if the SQL statements have changed since the savepoint was triggered and are no longer compatible with it. The default is false.For example:
SET 'execution.savepoint.ignore-unclaimed-state' = 'true';
Setup a connection to the Flink cluster
-
Log in to your Kubernetes cluster as a cluster administrator by setting your
kubectl
context. -
Switch to the namespace where the IBM Operator for Apache Flink is installed.
kubectl config set-context --current --namespace=<namespace>
-
Get the name of the Flink JobManager pod to connect to.
a. List the available
FlinkDeployment
custom resources.kubectl get flinkdeployment
For example:
kubectl get flinkdeployment NAME JOB STATUS LIFECYCLE STATE my-flink-deployment RUNNING STABLE
b. Retrieve the name of the first online Flink
JobManager
pod.export FLINK_JOB_MANAGER=$(kubectl get pods --selector component=jobmanager,app=<flink-deployment-name> --no-headers=true -o custom-columns=Name:.metadata.name | head -n 1)
For example:
export FLINK_JOB_MANAGER=$(kubectl get pods --selector component=jobmanager,app=my-flink-deployment --no-headers=true -o custom-columns=Name:.metadata.name | head -n 1) echo ${FLINK_JOB_MANAGER} my-flink-deployment-b5d95dc77-nmgnj
Submit a Flink SQL job
-
Setup the connection to the Flink cluster.
-
Copy the file
statements.sql
to the target containerflink-main-container
of the FlinkJobManager
podkubectl cp -c flink-main-container statements.sql ${FLINK_JOB_MANAGER}:/tmp
-
Submit the Flink SQL job to the Flink cluster.
kubectl exec ${FLINK_JOB_MANAGER} -- /opt/flink/bin/sql-client.sh -hist /dev/null -f /tmp/statements.sql
List the deployed Flink SQL jobs
-
Setup the connection to the Flink cluster.
-
List the Flink jobs:
kubectl exec -it ${FLINK_JOB_MANAGER} -- /opt/flink/bin/sql-client.sh -hist /dev/null <<< 'SHOW JOBS;'
Output example
+----------------------------------+-----------------+---------+-------------------------+ | job id | job name | status | start time | +----------------------------------+-----------------+---------+-------------------------+ | 89112b3a999e37740e2c73b6521d0778 | meaningful-name | RUNNING | 2023-05-11T13:45:59.451 | +----------------------------------+-----------------+---------+-------------------------+
Note: The output displays
Empty set
when no Flink job is deployed.
Trigger a savepoint for a running Flink SQL job
-
After meeting the required prerequisites, list the deployed Flink SQL jobs.
-
Locate the entry corresponding to the
job name
. -
Check that the status of this job is
RUNNING
and take note of the correspondingjob id
. -
Execute the following command that triggers the generation of a savepoint without stopping the job:
kubectl exec -it ${FLINK_JOB_MANAGER} -- /opt/flink/bin/flink savepoint --type canonical <job id>
For example:
kubectl exec -it ${FLINK_JOB_MANAGER} -- /opt/flink/bin/flink savepoint --type canonical 89112b3a999e37740e2c73b6521d0778 Triggering savepoint for job 89112b3a999e37740e2c73b6521d0778. Waiting for response... Savepoint completed. Path: file:/flink-data/savepoints/savepoint-89112b-8dbd328bf7c9
Take note of the savepoint path, you need it to restart the Flink job from this savepoint.
For information about how to restart a Flink job from a savepoint, see set deployment options.
Stop a Flink SQL job with a savepoint
-
After meeting the required prerequisites, list the deployed Flink SQL jobs.
-
Locate the entry corresponding to the
job name
. -
Check that the status of this job is
RUNNING
and take note of the correspondingjob id
. -
Execute the following command that triggers the generation of a savepoint and stops the Flink job:
kubectl exec -it ${FLINK_JOB_MANAGER} -- /opt/flink/bin/flink stop --type canonical <job id>
For example:
kubectl exec -it ${FLINK_JOB_MANAGER} -- /opt/flink/bin/flink stop --type canonical 89112b3a999e37740e2c73b6521d0778 Suspending job 89112b3a999e37740e2c73b6521d0778. Savepoint completed. Path: file:/flink-data/savepoints/savepoint-89112b-8dbd328bf7c9
Take note of the savepoint path, you need it to restart the Flink job from this savepoint.
For information about how to restart a Flink job from a savepoint, see set deployment options.