Deploying jobs by using the Apache SQL Runner sample

Find out how to deploy your flows in an application mode Flink cluster as part of your production environment.

Important:

  • This deployment cannot be used with the Event Processing UI.

Note: The Apache operator sample that is referenced in the following sections points to the version of the sample in the main branch, which is up-to-date, and might include fixes that are absent in the release branches.

Prerequisites

  • The SQL statements are exported from the Event Processing UI and saved to a file, for example, statements.sql.

    For more information, see exporting flows.

  • You updated the Flink SQL connector properties and values that are defined in the file statements.sql to match your target environment.

    For security reasons, the values containing sensitive credentials, such as username and password, are removed when exporting the SQL statements from the Event Processing UI, so you must restore them.

    Also, the exported SQL contains connector configuration that is applicable to the environment targeted in the Event Processing UI. When deploying to a different target environment, you might need to adapt the connector properties to the target environment.

    See the following table for credentials and connector properties information about supported Flink SQL connectors:

    Flink SQL connector Used by nodes Sensitive credentials Connector properties
    Kafka Source and destination About Kafka connector

    Note: When configuring SCRAM authentication for the Kafka connector, ensure you use double quotes only. Do not use a backslash character (\) to escape the double quotes. The valid format is: username="<username>" password="<password>"
    Kafka connector properties

    For more information about how events can be consumed from Kafka topics, see the Flink documentation.

    Note: The Kafka connector value must be kafka.
    JDBC Database About JDBC connector JDBC connector properties
    HTTP API About HTTP connector HTTP connector properties.

    Event Processing 1.2.3 icon Note: the HTTP connector version for Event Processing versions 1.2.3 and later is 0.16.0. For earlier Event Processing 1.2.x releases, see the 0.15.0 connector documentation.
  • To deploy a running Flink job, the SQL statements in the file statements.sql must contain one of the following clauses:

    • A definition of a Flink SQL Kafka sink (also known as event destination), and an INSERT INTO clause that selects the columns of the last temporary view into this sink.
    • A SELECT clause that takes one or all of the columns of the last temporary view.

    For more information about how to define a Flink SQL sink, see the Flink documentation.

Optionally, user-defined functions (UDFs) can be used as a complement of the built-in functions, by editing the SQL exported from the Event Processing UI.

For more information, see UDFs in the exported SQL.

  1. Log in to your Kubernetes cluster as a cluster administrator by setting your kubectl context.

  2. Switch to the namespace where the IBM Operator for Apache Flink is installed:

    kubectl config set-context --current --namespace=<namespace>
    

You can use a Kubernetes FlinkDeployment custom resource in application mode to deploy a Flink job for processing and deploying the statements in the file statements.sql.

A sample application flink-sql-runner-example is provided in the Apache Flink GitHub repository for that purpose.

Follow the instructions to build:

  • the flink-sql-runner-example JAR file (Flink job)
  • the Docker image

Important: Ensure that the Flink SQL runner JAR file and the statements.sql file have read permissions (644) for non-root users. If the JAR file is only readable by the root user, the FlinkDeployment instance cannot be started by non-root users.

Some adaptations to this procedure are required to build the Docker image and use the file statements.sql:

  1. Modify the Dockerfile to use the IBM Flink image:

    a. Execute the following command to extract the Flink image name including its SHA digest from the ClusterServiceVersion (CSV). For example, if you are running on Flink version 1.2.3:

    kubectl get csv -o jsonpath='{.spec.install.spec.deployments[*].spec.template.spec.containers[0].env[?(@.name=="IBM_FLINK_IMAGE")].value}' ibm-eventautomation-flink.v1.2.3
    

    Alternatively, you can obtain the image name from the Flink operator pod’s environment variable:

    kubectl set env pod/<flink_operator_pod_name> --list -n <flink_operator_namespace> | grep IBM_FLINK_IMAGE
    

    b. Edit the Dockerfile and change the FROM clause to use the IBM Flink image with its SHA digest, as determined in the previous step.

    FROM --platform=<platform> <IBM Flink image with digest>
    

    Where <platform> is linux/amd64 or linux/s390x, depending on your deployment target.

    c. If at the previous optional step you introduced the use of Flink user-defined functions (UDFs), edit the Dockerfile to copy the JAR file that contains the UDF classes:

    COPY --chown=flink:root <path-of-the-udf-jar> /opt/flink/lib
    

    For example:

    COPY --chown=flink:root /udfproject/target/udf.jar /opt/flink/lib
    

    d. Remove the sample SQL statement files from the sql-scripts directory.

    e. Copy the statements.sql file to the directory sql-scripts.

    f. Build the docker image and push it to a registry accessible from your OpenShift Container Platform. If your registry requires authentication, configure the image pull secret, for example, by using the global cluster pull secret.

  2. Create the IBM Operator for Apache Flink FlinkDeployment custom resource.

    You can use a Kubernetes FlinkDeployment custom resource in application mode to deploy a Flink job for processing and deploying the statements in the statements.sql file.

    a. Event Processing 1.2.3 icon You can start with the following example of an application mode Flink instance:

    apiVersion: flink.apache.org/v1beta1
    kind: FlinkDeployment
    metadata:
    name: application-cluster-prod
    spec:
      image: <image built FROM icr.io/cpopen/ibm-eventautomation-flink/ibm-eventautomation-flink>
      flinkConfiguration:
        license.use: EventAutomationProduction
        license.license: 'L-KCVZ-JL5CRM'
        license.accept: 'false'
        high-availability.type: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
        high-availability.storageDir: 'file:///opt/flink/volume/flink-ha'
        restart-strategy: failure-rate
        restart-strategy.failure-rate.max-failures-per-interval: '10'
        restart-strategy.failure-rate.failure-rate-interval: '10 min'
        restart-strategy.failure-rate.delay: '30 s'
        execution.checkpointing.interval: '5000'
        execution.checkpointing.unaligned: 'false'
        state.backend.type: rocksdb
        state.backend.rocksdb.thread.num: '10'
        state.backend.incremental: 'true'
        state.backend.rocksdb.use-bloom-filter: 'true'
        state.checkpoints.dir: 'file:///opt/flink/volume/flink-cp'
        state.checkpoints.num-retained: '3'
        state.savepoints.dir: 'file:///opt/flink/volume/flink-sp'
        taskmanager.numberOfTaskSlots: '2'
        table.exec.source.idle-timeout: '30 s'
        security.ssl.enabled: 'true'
        security.ssl.truststore: /opt/flink/tls-cert/truststore.jks
        security.ssl.truststore-password: <jks-password>
        security.ssl.keystore: /opt/flink/tls-cert/keystore.jks
        security.ssl.keystore-password: <jks-password>
        security.ssl.key-password: <jks-password>
        kubernetes.secrets: '<jks-secret>:/opt/flink/tls-cert'
    serviceAccount: flink
    podTemplate:
      apiVersion: v1
      kind: Pod
      metadata:
        name: pod-template
        spec:
          affinity:
            podAntiAffinity:
              preferredDuringSchedulingIgnoredDuringExecution:
                - weight: 80
                  podAffinityTerm:
                    labelSelector:
                      matchExpressions:
                        - key: type
                          operator: In
                          values:
                            - flink-native-kubernetes
                    topologyKey: kubernetes.io/hostname
          containers:
            - name: flink-main-container
              volumeMounts:
                - name: flink-logs
                  mountPath: /opt/flink/log
                - name: flink-volume
                  mountPath: /opt/flink/volume
          volumes:
            - name: flink-logs
              emptyDir: {}
            - name: flink-volume
              persistentVolumeClaim:
                claimName: ibm-flink-pvc
    jobManager:
      replicas: 2
      resource:
        memory: '4096m'
        cpu: 0.5
    taskManager:
      resource:
        memory: '4096m'
        cpu: 2
    job:
      jarURI: <insert jar file name here>
      args: ['<insert path for statements.sql here>']
      parallelism: 1
      state: running
      upgradeMode: savepoint
      allowNonRestoredState: true
    mode: native
    

    In Event Processing versions earlier than 1.2.3, select the Production - Flink Application cluster sample.

    Note: The Flink instance must be configured with persistent storage.

    If you do not want to use the examples provided earlier, add the following parameter to set a timeout period for event sources when they are marked idle. This allows downstream tasks to advance their watermark. Idleness is not detected by default. The parameter is included in all Flink samples:

    spec:
    flinkConfiguration:
       table.exec.source.idle-timeout: '30 s'
    

    For more information about table.exec.source.idle-timeout, see the Flink documentation.

    b. Append the following spec.job parameter, or edit the existing parameter if using the Production - Flink Application cluster sample:

    spec:
      job:
        jarURI: local:///opt/flink/usrlib/sql-runner.jar
        args: ["/opt/flink/usrlib/sql-scripts/statements.sql"]
        parallelism: 1
        state: running
        upgradeMode: savepoint
    

    c. Set the Flink image:

    spec:
      image: <image built at step 1.e>
    

    d. Set the Flink version (only required if the --set webhook.create is set to false during the operator installation).

    1. Obtain the correct version by listing the IBM_FLINK_VERSION environment variable on the Flink operator pod:

      kubectl set env pod/<flink_operator_pod_name> --list -n <flink_operator_namespace> | grep IBM_FLINK_VERSION
      
    2. Add the version to the FlinkDeployment, for example:

      spec:
        flinkVersion: "v1_19"
      
  3. Apply the modified FlinkDeployment custom resource.

  1. Edit the FlinkDeployment custom resource.

    a. Ensure that the Flink cluster has enough task slots to fulfill the targeted parallelism value.

    Task slots = spec.taskmanager.replicas × spec.flinkConfiguration["taskmanager.numberOfTaskSlots"] 
    

    b. Change the spec.job.parallelism value, then set spec.job.state to running and spec.job.upgradeMode to savepoint.

    spec:
      job:
        jarURI: local:///opt/flink/usrlib/sql-runner.jar
        args: ["/opt/flink/usrlib/sql-scripts/statements.sql"]
        parallelism: 2
        state: running
        upgradeMode: savepoint
        allowNonRestoredState: true
    
  2. Apply the modified FlinkDeployment custom resource.

    The following operations are automatically performed by Flink:

    • A savepoint is created before the Flink job is suspended.
    • The Flink cluster is shutdown, the JobManager and TaskManager pods are terminated.
    • A Flink cluster is created with new JobManager and TaskManager pods.
    • The Flink job is restarted from the savepoint.
  1. Ensure that the status section indicates that the Job Manager is in READY status and that the Flink job is in RUNNING status by checking the FlinkDeployment custom resource.

    status:
      jobManagerDeploymentStatus: READY
      jobStatus:
        state: RUNNING
    
  2. Set the following values in the FlinkDeployment custom resource:

    a. Set the value of spec.job.upgradeMode to savepoint.

    b. Set the value of spec.job.state to running.

    c. Set the value of spec.job.savepointTriggerNonce to an integer that has never been used before for that option.

    For example:

    job:
      jarURI: local:///opt/flink/usrlib/sql-runner.jar
      args: ["/opt/flink/usrlib/sql-scripts/statements.sql"]
      savepointTriggerNonce: <integer value>
      state: running
      upgradeMode: savepoint
    

    d. Save the changes in the FlinkDeployment custom resource.

    A savepoint is triggered and written to a location in the PVC, which is indicated in the status.jobStatus.savepointInfo.lastSavepoint.location field of the FlinkDeployment custom resource.

    For example:

    status:
      [...]
      jobStatus:
        [...]
        savepointInfo:
          [...]
          lastSavepoint:
            formatType: CANONICAL
            location: 'file:/opt/flink/volume/flink-sp/savepoint-e372fa-9069a1c0563e'
            timeStamp: 1733957991559
            triggerNonce: 1
            triggerType: MANUAL
    
  3. Keep the FlinkDeployment custom resource and the PVC to make them available later for restoring your deployment.

  1. Edit the FlinkDeployment custom resource.

  2. Make the following modifications:

    a. Ensure that the value of spec.job.upgradeMode is savepoint.

    b. Ensure that the value of spec.job.state is suspended to stop the Flink job.

    spec:
      job:
        jarURI: local:///opt/flink/usrlib/sql-runner.jar
        args: ["/opt/flink/usrlib/sql-scripts/statements.sql"]
        state: suspended
        upgradeMode: savepoint
    
  3. Save the changes in the FlinkDeployment custom resource.

    A savepoint is triggered and written to a location in the PVC, which is indicated in the status.jobStatus.savepointInfo.lastSavepoint.location field of the FlinkDeployment custom resource.

    For example:

    status:
      [...]
      jobStatus:
        [...]
        savepointInfo:
          [...]
          lastSavepoint:
            formatType: CANONICAL
            location: 'file:/opt/flink/volume/flink-sp/savepoint-e372fa-9069a1c0563e'
            timeStamp: 1733957991559
            triggerNonce: 1
            triggerType: UPGRADE
    
  1. Edit the FlinkDeployment custom resource that you saved earlier when you triggered a savepoint or the custom resource of a Flink job that you suspended earlier:

    a. Ensure that the value of spec.job.upgradeMode is savepoint.

    b. Ensure that the value of spec.job.state is running to resume the Flink job.

    c. Remove spec.job.savepointTriggerNonce and its value.

    d. Set the value of spec.job.initialSavepointPath to the savepoint location found as described in step 2.d if you triggered a savepoint earlier or in step 3 if you suspended the Flink job earlier, plus the suffix /_metadata.

    For example:

    job:
      jarURI: local:///opt/flink/usrlib/sql-runner.jar
      args: ["/opt/flink/usrlib/sql-scripts/statements.sql"]
      state: running
      upgradeMode: savepoint
      initialSavepointPath: file:/opt/flink/volume/flink-sp/savepoint-e372fa-9069a1c0563e/_metadata
      allowNonRestoredState: true
    
  2. Save the changes in the FlinkDeployment custom resource.

    A savepoint is triggered and written to a location in the PVC, which is indicated in the status.jobStatus.savepointInfo.lastSavepoint.location field of the FlinkDeployment custom resource.

    For example:

    status:
      [...]
      jobStatus:
        [...]
        savepointInfo:
          [...]
          lastSavepoint:
            formatType: CANONICAL
            location: 'file:/opt/flink/volume/flink-sp/savepoint-e372fa-9069a1c0563e'
            timeStamp: 1733957991559
            triggerNonce: 1
            triggerType: MANUAL
    

Enable SSL connection for your database

To securely connect Flink jobs to a database such as PostgreSQL, MySQL, or Oracle, enable an SSL connection with the database as follows:

  1. Ensure you added the CA certificate for your database to the truststore and then created a secret with the truststore.

  2. Edit the FlinkDeployment custom resource.

  3. Complete the following modifications:

    • In the spec.flinkConfiguration section, add:

      env.java.opts.taskmanager: >-
         -Djavax.net.ssl.trustStore=/certs/truststore.<keystore-extension>
         -Djavax.net.ssl.trustStorePassword=<chosen password>
      env.java.opts.jobmanager: >-
         -Djavax.net.ssl.trustStore=/certs/truststore.<keystore-extension>
         -Djavax.net.ssl.trustStorePassword=<chosen password>
      

    Where:

    • <keystore-extension> is the extension for your keystore format. For example, jks for Java Keystore and p12 for Public-Key Cryptography Standards.

    • In spec.podTemplate.spec.containers.volumeMounts section, add:

      - mountPath: /certs
        name: truststore
        readOnly: true
      
    • In spec.podTemplate.spec.volumes section, add:

      - name: truststore
        secret:
          items:
            - key: truststore.<keystore-extension>
              path: truststore.<keystore-extension>
          secretName: ssl-truststore
      
  4. Apply the modified FlinkDeployment custom resource:

    kubectl apply -f <custom-resource-file-path>
    

    For example:

    kubectl apply -f flinkdeployment_demo.yaml
    

A secure SSL connection is enabled between Flink and the database.