Deploying customized jobs

Event Processing 1.2.3 icon Find out how to deploy your Flink jobs that are customized for production or test environments in an application mode Flink cluster.

Benefits of customized Flink jobs include:

  • Compatibility when working on an automation in a continuous integration and continuous delivery (CI/CD) pipeline.
  • The ability to reuse and easily customize the connector configuration across different targeted environments, such as test, staging, pre-production, and production (with the JSON and configuration YAML export option).
  • Support for the automatic upgrade of your IBM Operator for Apache Flink version.

    Note: Automatic upgrade is not supported if the FlinkDeployment custom resource uses an extension of the IBM Flink image. In this case, the extension of the image must be rebuilt to use the upgraded IBM Flink image.

Important: This deployment cannot be used with the Event Processing UI.

Prerequisites

  • You exported your flow from the Event Processing UI in the JSON and configuration YAML format, and saved it to a file, for example, flow.zip.

  • You downloaded the exported flow file (for example, flow.zip), and extracted it to have the flow.json and config.yaml files.

Preparing the config.yaml file to match the target environment

Before deploying the Flink job, edit the config.yaml file. The following is an example config.yaml file exported from the Event Processing UI:

config:
  source:
    source___TABLE:
      properties.sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule
        required username="<username_redacted>" password="<password_redacted>";
      connector: kafka
      format: json
      json.ignore-parse-errors: 'true'
      properties.bootstrap.servers: kafka.01:9002
      properties.isolation.level: read_committed
      properties.sasl.mechanism: SCRAM-SHA-512
      properties.security.protocol: SASL_SSL
      properties.ssl.endpoint.identification.algorithm: ''
      properties.ssl.truststore.certificates: '-----BEGIN CERTIFICATE----- [...] -----END CERTIFICATE-----'
      properties.ssl.truststore.type: PEM
      properties.tls.pemChainIncluded: 'false'
      scan.startup.mode: earliest-offset
      topic: input-topic
  sink_1:
    sink_1:
      properties.sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule
        required username="<username_redacted>" password="<password_redacted>";
      connector: kafka
      format: json
      properties.bootstrap.servers: kafka.01:9002
      properties.sasl.mechanism: SCRAM-SHA-512
      properties.security.protocol: SASL_SSL
      properties.ssl.endpoint.identification.algorithm: ''
      properties.ssl.truststore.certificates: '-----BEGIN CERTIFICATE----- [...] -----END CERTIFICATE-----'
      properties.ssl.truststore.type: PEM
      properties.tls.pemChainIncluded: 'false'
      topic: output-topic

The config.yaml file contains information about the nodes, tables, and their connector properties where:

  • config: Lists the names of the nodes that use Kafka, JDBC, and HTTP connectors. In the example snippet, the source and sink_1 nodes are listed.
  • config.source: Lists the tables present in the SQL generated for the source node. In the example snippet, there is only one table for the source node: source___TABLE
  • config.source.source___TABLE: Lists the connector properties of the source___TABLE table.
  • config.sink_1: Lists the tables present in the SQL generated for the sink_1 node. In the example snippet, there is only one table for the sink_1 node: sink_1 (table name is same as the node name).
  • config.sink_1.sink_1: Lists the connector properties of the sink_1 table.

Note: Sensitive properties, which have redacted values, are listed first, in alphabetical order of the connector property name. Unredacted properties are then listed also in the alphabetical order. Their value is based on what you configured in the Event Processing UI.

You can edit the config.yaml file as follows:

  • An actual value must be provided for the properties with redacted values. Otherwise, the deployment fails.
  • Do not modify node names and table names. It must match the values in the flow.json file.
  • Optional: values of unredacted properties can be modified.
    • Important: The property scan.startup.mode has always the value earliest-offset in the exported config.yaml file. It can be changed for instance to latest-offset, as required.
  • Optional: connector properties can be added or removed.
  • For security reasons, the values containing sensitive credentials, such as username and password, are removed when exporting the flow from the Event Processing UI, so you must restore them.

Also, the exported config.yaml file contains connector configuration that is applicable to the environment targeted in the Event Processing UI. When deploying to a different target environment, you might need to adapt the connector properties to the target environment.

See the following table for credentials and connector properties information about supported Flink SQL connectors:

Flink SQL connector Used by nodes Sensitive credentials Connector properties
Kafka Source and destination About Kafka connector

Note: When configuring SCRAM authentication for the Kafka connector, ensure you use double quotes only. Do not use a backslash character (\) to escape the double quotes. The valid format is: username="<username>" password="<password>"
Kafka connector properties

For more information about how events can be consumed from Kafka topics, see the Flink documentation.

Note: The Kafka connector value must be kafka.
JDBC Database About JDBC connector JDBC connector properties
HTTP API About HTTP connector HTTP connector properties.

Event Processing 1.2.3 icon Note: the HTTP connector version for Event Processing versions 1.2.3 and later is 0.16.0. For earlier Event Processing 1.2.x releases, see the 0.15.0 connector documentation.

Important:

  • The value of the format property of the Kafka connector, which allows to select between Avro, Avro (schema registry), and JSON, should not be changed in the config.yaml. Instead, change it by editing the flow in the Event Processing UI, test the flow, then export the flow again.
  • The flow.json file must not be modified.

Use of templating

The values of the connector properties in the config.yaml file can be turned into template variables by manually editing the config.yaml file.

The actual values can then be provided in a CI/CD pipeline, by using any templating engine.

For example (the exact syntax might vary based on the templating engine):

properties.sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule
  required username="{{ kafka.username }}" password="{{ kafka.password }}";

You can use a Kubernetes FlinkDeployment custom resource in application mode to deploy a Flink job.

  1. Log in to your Kubernetes cluster as a cluster administrator by setting your kubectl context.

  2. Switch to the namespace where the IBM Operator for Apache Flink is installed:

    kubectl config set-context --current --namespace=<namespace>
    
  3. Deploy a Kubernetes secret holding the flow.json and config.yaml files:

    kubectl create secret generic <k8s_secret_name> \
      --from-file=flow.json=./flow.json \
      --from-file=config.yaml=./config.yaml
    

    Where <k8s_secret_name must match the name specified for the secret in the FlinkDeployment, for instance:

    volumes:
      - name: flow-volume
        secret:
          secretName: application-cluster-prod
    

    A simple way for allowing the coexistence of multiple Flink instances in the same namespace is to use for the Kubernetes secret the same name as for the Flink instance. This way, the mapping between Flink instances and Kubernetes secrets is clear, and there is no clash.

  4. Create the IBM Operator for Apache Flink FlinkDeployment custom resource.

    a. Choose the Production - Flink Application cluster sample, or a Flink custom resource in application mode, configured with persistent storage. If you prefer to not use a provided sample, add the following parameter to set a timeout period for event sources when they are marked idle. This allows downstream tasks to advance their watermark. Idleness is not detected by default. The parameter is included in all the provided samples.

    spec:
      flinkConfiguration:
        table.exec.source.idle-timeout: '30 s'
    

    For more information about table.exec.source.idle-timeout, see the Flink documentation.

    b. Prepare the FlinkDeployment custom resource as described in step 1 of installing a Flink instance.

  5. Apply the modified FlinkDeployment custom resource by using the UI or the CLI.

  1. Ensure that the status section indicates that the Job Manager is in READY status and that the Flink job is in RUNNING status by checking the FlinkDeployment custom resource.

    status:
      jobManagerDeploymentStatus: READY
      jobStatus:
        state: RUNNING
    
  2. Set the following values in the FlinkDeployment custom resource:

    a. Set the value of spec.job.upgradeMode to savepoint.

    b. Set the value of spec.job.state to running.

    c. Set the value of spec.job.savepointTriggerNonce to an integer that has never been used before for that option.

    For example:

    job:
      jarURI: local:///opt/flink/ibm-flow/ibm-ep-flow-deployer.jar
      args: []
      savepointTriggerNonce: <integer value>
      state: running
      upgradeMode: savepoint
    

    d. Save the changes in the FlinkDeployment custom resource.

    A savepoint is triggered and written to a location in the PVC, which is indicated in the status.jobStatus.savepointInfo.lastSavepoint.location field of the FlinkDeployment custom resource.

    For example:

    status:
      [...]
      jobStatus:
        [...]
        savepointInfo:
          [...]
          lastSavepoint:
            formatType: CANONICAL
            location: 'file:/opt/flink/volume/flink-sp/savepoint-e372fa-9069a1c0563e'
            timeStamp: 1733957991559
            triggerNonce: 1
            triggerType: MANUAL
    
  3. Keep the FlinkDeployment custom resource and the PVC to make them available later for restoring your deployment.

  1. Edit the FlinkDeployment custom resource.

  2. Make the following modifications:

    a. Ensure that the value of spec.job.upgradeMode is savepoint.

    b. Ensure that the value of spec.job.state is suspended to stop the Flink job.

    spec:
      job:
        jarURI: local:///opt/flink/ibm-flow/ibm-ep-flow-deployer.jar
        args: []
        state: suspended
        upgradeMode: savepoint
    
  3. Save the changes in the FlinkDeployment custom resource.

    A savepoint is triggered and written to a location in the PVC, which is indicated in the status.jobStatus.savepointInfo.lastSavepoint.location field of the FlinkDeployment custom resource.

    For example:

    status:
      [...]
      jobStatus:
        [...]
        savepointInfo:
          [...]
          lastSavepoint:
            formatType: CANONICAL
            location: 'file:/opt/flink/volume/flink-sp/savepoint-e372fa-9069a1c0563e'
            timeStamp: 1733957991559
            triggerNonce: 1
            triggerType: UPGRADE
    
  1. Edit the FlinkDeployment custom resource that you saved earlier when you triggered a savepoint or the custom resource of a Flink job that you suspended earlier:

    a. Ensure that the value of spec.job.upgradeMode is savepoint.

    b. Ensure that the value of spec.job.state is running to resume the Flink job.

    c. Remove spec.job.savepointTriggerNonce and its value.

    d. Set the value of spec.job.initialSavepointPath to the savepoint location found as described in step 2.d during savepoint triggering or in step 3 if you suspended the job, plus the suffix /_metadata.

    For example:

    job:
      jarURI: local:///opt/flink/ibm-flow/ibm-ep-flow-deployer.jar
      args: []
      state: running
      upgradeMode: savepoint
      initialSavepointPath: file:/opt/flink/volume/flink-sp/savepoint-e372fa-9069a1c0563e/_metadata
      allowNonRestoredState: true
    
  2. Apply the modified FlinkDeployment custom resource.

Enable SSL connection for your database

To securely connect Flink jobs to a database such as PostgreSQL, MySQL, or Oracle, enable an SSL connection with the database as follows:

  1. Ensure you added the CA certificate for your database to the truststore and then created a secret with the truststore.

  2. Edit the FlinkDeployment custom resource.

  3. Complete the following modifications:

    • In the spec.flinkConfiguration section, add:

      env.java.opts.taskmanager: >-
         -Djavax.net.ssl.trustStore=/certs/truststore.<keystore-extension>
         -Djavax.net.ssl.trustStorePassword=<chosen password>
      env.java.opts.jobmanager: >-
         -Djavax.net.ssl.trustStore=/certs/truststore.<keystore-extension>
         -Djavax.net.ssl.trustStorePassword=<chosen password>
      

    Where:

    • <keystore-extension> is the extension for your keystore format. For example, jks for Java Keystore and p12 for Public-Key Cryptography Standards.

    • In spec.podTemplate.spec.containers.volumeMounts section, add:

      - mountPath: /certs
        name: truststore
        readOnly: true
      
    • In spec.podTemplate.spec.volumes section, add:

      - name: truststore
        secret:
          items:
            - key: truststore.<keystore-extension>
              path: truststore.<keystore-extension>
          secretName: ssl-truststore
      
  4. Apply the modified FlinkDeployment custom resource:

    kubectl apply -f <custom-resource-file-path>
    

    For example:

    kubectl apply -f flinkdeployment_demo.yaml
    

A secure SSL connection is enabled between Flink and the database.

Troubleshooting

The successful execution of the Flink job when deploying depends on the correctness of the actual connector configuration.

For troubleshooting:

  • Check the logs of the active Flink Job Manager (for errors due to invalid config.yaml file, search for com.ibm.ei.streamproc.model.deploy.imports.ConfigModelException).
  • Errors due to incorrect configuration can also lead to Flink exceptions, which are logged in the Flink task manager pods.