Deploying jobs in development environments

Find out how to deploy your advanced flows in a Flink cluster for development and testing purposes.

Prerequisites

  • Ensure you have configured persistent storage before you trigger a savepoint.

  • Ensure that you have installed a session cluster instance of Flink by using a FlinkDeployment custom resource. This session cluster must be different from the one used by Event Processing.

    For more information, see installing a Flink instance and Flink sample deployments.

    Note: When deploying Flink for non-production environments (such as development or testing purposes), set license.use to EventAutomationNonProduction in the FlinkDeployment custom resource:

    spec:
      flinkConfiguration:
        license.use: EventAutomationNonProduction
        license.license: L-HRZF-DWHH7A
        license.accept: 'true'
    
  • To run the SQL client, you must disable TLS by removing the following Flink configuration parameters from your FlinkDeployment custom resource:

    spec:
      flinkConfiguration:
        security.ssl.enabled: 'true'
        security.ssl.truststore: /opt/flink/tls-cert/truststore.jks
        security.ssl.truststore-password: <jks-password>
        security.ssl.keystore: /opt/flink/tls-cert/keystore.jks
        security.ssl.keystore-password: <jks-password>
        security.ssl.key-password: <jks-password>
        kubernetes.secrets: '<jks-secret>:/opt/flink/tls-cert'
    
  • The SQL statements are exported from the Event Processing UI and saved to a file, for example, statements.sql.

    For more information, see exporting flows.

  • You updated the Flink SQL Kafka connectors properties and values defined in the file statements.sql to match your target environment:

    • Sensitive credentials.

      For security reasons, the values containing sensitive credentials such as username and password are removed from the Event Processing UI when exporting the SQL statements, so you must restore them.

      For more information about Flink SQL Kafka connectors, see the Flink documentation.

      Note: When configuring SCRAM authentication for the Kafka connector, ensure you use double quotes only. Do not use a backlash character (\) to escape the double quotes. The valid format is: username="<username>" password="<password>"

    • Connector properties values.

      For more information about how events can be consumed from Kafka topics, see the Flink Documentation.

      Note: The Kafka connector value must be kafka.

    • To deploy a running Flink job, the SQL statements in the file statements.sql must contain one of the following:

      • A definition of a Flink SQL Kafka sink (also known as event destination), and an INSERT INTO clause that selects the columns of the last temporary view into this sink.
      • A SELECT clause that takes one or all of the columns of the last temporary view.

      For more information about how to define a Flink SQL sink, see the Flink documentation.

Set deployment options

You can specify deployment options in the file statements.sql.

Each statement is optional and can be added at the top of the file with the following syntax.

SET 'key' = 'value';
  • Change the parallelism of the Flink job. The default value is 1.

    For example:

    SET 'parallelism.default' = '2';
    
  • Give a meaningful name to the Flink job.

    For example:

    SET 'pipeline.name' = 'meaningful-name';
    
  • Specify a minimum time interval for how long idle Flink job states will be retained. No cleanup is enabled by default.

    For example:

    SET 'table.exec.state.ttl' = '20';
    

    For more information about table.exec.state.ttl, see the Flink documentation.

  • Specify a timeout period for event sources when they are marked idle. This allows downstream tasks to advance their watermark. Idleness is not detected by default.

    For example:

    SET 'table.exec.source.idle-timeout' = '15 s';
    

    For more information about table.exec.source.idle-timeout, see the Flink documentation.

  • Use an existing savepoint for the Flink job submission.

    For example:

    SET 'execution.savepoint.path' = '/opt/flink/volume/flink-sp/savepoint-cca7bc-bb1e257f0dab';
    
  • Allow to skip a savepoint state that cannot be restored. Set this option to true if the SQL statements have changed since the savepoint was triggered and are no longer compatible with it. The default is false.

    For example:

    SET 'execution.savepoint.ignore-unclaimed-state' = 'true';
    

Optionally, user-defined functions (UDFs) can be used as a complement of the built-in functions, by editing the SQL exported from the Event Processing UI.

For more information, see UDFs in the exported SQL.

  1. Log in to your Kubernetes cluster as a cluster administrator by setting your kubectl context.

  2. Switch to the namespace where the IBM Operator for Apache Flink is installed.

    kubectl config set-context --current --namespace=<namespace>
    
  3. Get the name of the Flink JobManager pod to connect to.

    a. List the available FlinkDeployment custom resources.

    kubectl get flinkdeployment
    

    For example:

    kubectl get flinkdeployment
       
    NAME                   JOB STATUS   LIFECYCLE STATE
    my-flink-deployment    RUNNING      STABLE
    

    b. Retrieve the name of the first online Flink JobManager pod.

    export FLINK_JOB_MANAGER=$(kubectl get pods --selector component=jobmanager,app=<flink-deployment-name> --no-headers=true -o custom-columns=Name:.metadata.name | head -n 1)
    

    For example:

    export FLINK_JOB_MANAGER=$(kubectl get pods --selector component=jobmanager,app=my-flink-deployment --no-headers=true -o custom-columns=Name:.metadata.name | head -n 1)
    echo ${FLINK_JOB_MANAGER}
    my-flink-deployment-b5d95dc77-nmgnj
    
  1. Setup the connection to the Flink cluster.

  2. Copy the file statements.sql to the target container flink-main-container of the Flink JobManager pod:

    kubectl cp -c flink-main-container statements.sql ${FLINK_JOB_MANAGER}:/tmp
    
  3. If at the previous optional step you introduced the use of Flink user-defined functions (UDFs), copy the JAR file that contains the UDF classes:

    kubectl cp -c flink-main-container <path-of-the-udf-jar> ${FLINK_JOB_MANAGER}:/opt/flink/lib
    

    For example:

    kubectl cp -c flink-main-container /udfproject/target/udf.jar ${FLINK_JOB_MANAGER}:/opt/flink/lib
    
  4. Submit the Flink SQL job to the Flink cluster:

    kubectl exec ${FLINK_JOB_MANAGER} -- /opt/flink/bin/sql-client.sh -hist /dev/null -f /tmp/statements.sql
    
  1. Setup the connection to the Flink cluster.

  2. List the Flink jobs:

    kubectl exec -it ${FLINK_JOB_MANAGER} -- /opt/flink/bin/sql-client.sh -hist /dev/null <<< 'SHOW JOBS;'
    

    Output example

    +----------------------------------+-----------------+---------+-------------------------+
    |                           job id |        job name |  status |              start time |
    +----------------------------------+-----------------+---------+-------------------------+
    | 89112b3a999e37740e2c73b6521d0778 | meaningful-name | RUNNING | 2023-05-11T13:45:59.451 |
    +----------------------------------+-----------------+---------+-------------------------+
    

    Note: The output displays Empty set when no Flink job is deployed.

  1. After meeting the required prerequisites, list the deployed Flink SQL jobs.

  2. Locate the entry corresponding to the job name.

  3. Check that the status of this job is RUNNING and take note of the corresponding job id.

  4. Execute the following command that triggers the generation of a savepoint without stopping the job:

    kubectl exec -it ${FLINK_JOB_MANAGER} -- /opt/flink/bin/flink savepoint --type canonical <job id>
    

    For example:

    kubectl exec -it ${FLINK_JOB_MANAGER} -- /opt/flink/bin/flink savepoint --type canonical 89112b3a999e37740e2c73b6521d0778
       
    Triggering savepoint for job 89112b3a999e37740e2c73b6521d0778.
    Waiting for response...
    Savepoint completed. Path: file:/flink-data/savepoints/savepoint-89112b-8dbd328bf7c9
    

    Take note of the savepoint path, you need it to restart the Flink job from this savepoint.

    For information about how to restart a Flink job from a savepoint, see set deployment options.

  1. After meeting the required prerequisites, list the deployed Flink SQL jobs.

  2. Locate the entry corresponding to the job name.

  3. Check that the status of this job is RUNNING and take note of the corresponding job id.

  4. Execute the following command that triggers the generation of a savepoint and stops the Flink job:

    kubectl exec -it ${FLINK_JOB_MANAGER} -- /opt/flink/bin/flink stop --type canonical <job id>
    

    For example:

    kubectl exec -it ${FLINK_JOB_MANAGER} -- /opt/flink/bin/flink stop --type canonical 89112b3a999e37740e2c73b6521d0778
       
    Suspending job 89112b3a999e37740e2c73b6521d0778.
    Savepoint completed. Path: file:/flink-data/savepoints/savepoint-89112b-8dbd328bf7c9
    

    Take note of the savepoint path, you need it to restart the Flink job from this savepoint.

    For information about how to restart a Flink job from a savepoint, see set deployment options.