Deploying customized jobs

Find out how to deploy your Flink jobs that are customized for production or test environments in an application mode Flink cluster.

Benefits of customized Flink jobs include:

  • Compatibility when working on an automation in a continuous integration and continuous delivery (CI/CD) pipeline.
  • The ability to reuse and easily customize the connector configuration across different targeted environments, such as test, staging, pre-production, and production (with the JSON and configuration YAML export option).
  • Can be used for flows that contain the detect patterns node or the deduplicate node.
  • Support for the automatic upgrade of your IBM Operator for Apache Flink version.

Important: This deployment cannot be used with the Event Processing UI.

Prerequisites

  • You exported your flow from the Event Processing UI in the JSON and configuration YAML format, and saved it to a file, for example, flow.zip.

  • You downloaded the exported flow file (for example, flow.zip), and extracted it to have the flow.json and config.yaml files.

Preparing the config.yaml file to match the target environment

Before deploying the Flink job, edit the config.yaml file. The following is an example config.yaml file exported from the Event Processing UI:

config:
  source:
    source___TABLE:
      properties.sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule
        required username="<username_redacted>" password="<password_redacted>";
      connector: kafka
      format: json
      json.ignore-parse-errors: 'true'
      properties.bootstrap.servers: kafka.01:9002
      properties.isolation.level: read_committed
      properties.sasl.mechanism: SCRAM-SHA-512
      properties.security.protocol: SASL_SSL
      properties.ssl.endpoint.identification.algorithm: ''
      properties.ssl.truststore.certificates: '-----BEGIN CERTIFICATE----- [...] -----END CERTIFICATE-----'
      properties.ssl.truststore.type: PEM
      properties.tls.pemChainIncluded: 'false'
      scan.startup.mode: earliest-offset
      topic: input-topic
  sink_1:
    sink_1:
      properties.sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule
        required username="<username_redacted>" password="<password_redacted>";
      connector: kafka
      format: json
      properties.bootstrap.servers: kafka.01:9002
      properties.sasl.mechanism: SCRAM-SHA-512
      properties.security.protocol: SASL_SSL
      properties.ssl.endpoint.identification.algorithm: ''
      properties.ssl.truststore.certificates: '-----BEGIN CERTIFICATE----- [...] -----END CERTIFICATE-----'
      properties.ssl.truststore.type: PEM
      properties.tls.pemChainIncluded: 'false'
      topic: output-topic

The config.yaml file contains information about the nodes, tables, and their connector properties where:

  • config: Lists the names of the nodes that use Kafka, JDBC, and HTTP connectors. In the example snippet, the source and sink_1 nodes are listed.
  • config.source: Lists the tables present in the SQL generated for the source node. In the example snippet, there is only one table for the source node: source___TABLE
  • config.source.source___TABLE: Lists the connector properties of the source___TABLE table.
  • config.sink_1: Lists the tables present in the SQL generated for the sink_1 node. In the example snippet, there is only one table for the sink_1 node: sink_1 (table name is same as the node name).
  • config.sink_1.sink_1: Lists the connector properties of the sink_1 table.

Note: Sensitive properties, which have redacted values, are listed first, in alphabetical order of the connector property name. Unredacted properties are then listed also in the alphabetical order. Their value is based on what you configured in the Event Processing UI.

You can edit the config.yaml file as follows:

  • An actual value must be provided for the properties with redacted values. Otherwise, the deployment fails.
  • Do not modify node names and table names. It must match the values in the flow.json file.
  • Optional: values of unredacted properties can be modified.

    Important: The property scan.startup.mode has always the value earliest-offset in the exported config.yaml file. It can be changed for instance to latest-offset, as required.

  • Optional: connector properties can be added or removed.
  • For security reasons, the values containing sensitive credentials, such as username and password, are removed when exporting the flow from the Event Processing UI, so you must restore them.

    Important: In the config.yaml file, if you see a warning message about enabling SSL connections for Kafka brokers in production environments, see configuring SSL for Kafka brokers in production environments.

Also, the exported config.yaml file contains connector configuration that is applicable to the environment targeted in the Event Processing UI. When deploying to a different target environment, you might need to adapt the connector properties to the target environment.

See the following table for credentials and connector properties information about supported Flink SQL connectors:

Flink SQL connector Used by nodes Sensitive credentials Connector properties
Kafka Source and destination About Kafka connector

Note: When configuring SCRAM authentication for the Kafka connector, ensure you use double quotes only. Do not use a backslash character (\) to escape the double quotes. The valid format is: username="<username>" password="<password>"
Kafka connector properties

For more information about how events can be consumed from Kafka topics, see the Flink documentation.

Note: The Kafka connector value must be kafka. The Kafka connector version for Event Processing versions 1.5.0 and later is 3.4.0-1.20.
JDBC Database About JDBC connector JDBC connector properties

Note: The JDBC connector version for Event Processing versions 1.5.0 and later is 3.3.0-1.20.
HTTP API About HTTP connector HTTP connector properties.

Note: The HTTP connector version in Event Processing 1.5.0 and later is 0.25.0.

Important:

  • The value of the format property of the Kafka connector, which allows to select between Avro, Avro (schema registry), and JSON, should not be changed in the config.yaml. Instead, change it by editing the flow in the Event Processing UI, test the flow, then export the flow again.
  • The flow.json file must not be modified.

Use of templating

The values of the connector properties in the config.yaml file can be turned into template variables by manually editing the config.yaml file.

The actual values can then be provided in a CI/CD pipeline, by using any templating engine.

For example (the exact syntax might vary based on the templating engine):

properties.sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule
  required username="{{ kafka.username }}" password="{{ kafka.password }}";

You can use a Kubernetes FlinkDeployment custom resource in application mode to deploy a Flink job.

  1. Log in to your Kubernetes cluster as a cluster administrator by setting your kubectl context.

  2. Switch to the namespace where the IBM Operator for Apache Flink is installed:

    kubectl config set-context --current --namespace=<namespace>
    
  3. Deploy a Kubernetes secret holding the flow.json and config.yaml files:

    kubectl create secret generic <k8s_secret_name> \
      --from-file=flow.json=./flow.json \
      --from-file=config.yaml=./config.yaml
    

    For example, to create a secret named application-cluster-prod:

    kubectl create secret generic application-cluster-prod \
      --from-file=flow.json=./flow.json \
      --from-file=config.yaml=./config.yaml
    

    Ensure that you make a note of the secret name that you provide in the <k8s_secret_name> field to use in the next step.

    A simple way for allowing the coexistence of multiple Flink instances in the same namespace is to use for the Kubernetes secret the same name as for the Flink instance. This way, the mapping between Flink instances and Kubernetes secrets is clear, and there is no clash.

  4. Create the IBM Operator for Apache Flink FlinkDeployment custom resource.

    a. Choose the Production - Flink Application cluster sample, or a Flink custom resource in application mode, configured with persistent storage.

    If you prefer to not use a provided sample, add the following parameters:

    • Set a timeout period for event sources when they are marked idle. This allows downstream tasks to advance their watermark. Idleness is not detected by default. The parameter is included in all the provided samples.

      spec:
        flinkConfiguration:
          table.exec.source.idle-timeout: '30 s'
      

      For more information about table.exec.source.idle-timeout, see the Flink documentation.

    • Add the following volume declaration, setting the secret.secretName field to the secret name that you created in step 3 earlier, for example:

      volumes:
        - name: flow-volume
          secret:
            secretName: application-cluster-prod
      
    • Add the volume mount declaration:

      spec:
        # [...]
        podTemplate:
         # [...]
          spec:
            # [...]
            containers:
              - name: flink-main-container
                volumeMounts:
                  # [...]
                  - name: flow-volume
                    mountPath: /opt/flink/ibm-flow/flow.json
                    subPath: flow.json
                    readOnly: true
                  - name: flow-volume
                    mountPath: /opt/flink/ibm-flow/config.yaml
                    subPath: config.yaml
                    readOnly: true
      
    • Configure the spec.job field:

      spec:
        # [...]
        job:
          jarURI: 'local:///opt/flink/ibm-flow/ibm-ep-flow-deployer.jar'
          args: []
          parallelism: 1
          state: running
          upgradeMode: savepoint
          allowNonRestoredState: true
      
    • Event Processing 1.5.2 icon Optional: To use user-defined functions (UDFs) in your Flink job, reference the JAR file that contains the UDF classes by using an init container.

    b. Prepare the FlinkDeployment custom resource as described in step 1 of installing a Flink instance.

  5. Optional: If your flow connects to databases or API servers, ensure that you have configured the SSL connection.

  6. Apply the modified FlinkDeployment custom resource by using the UI or the CLI.

Optionally, user-defined functions (UDFs) can be used to complement the built-in functions. UDFs can be used in the SQL of SQL processor nodes authored in the Event Processing UI.

Event Processing 1.5.2 icon To use UDFs in your deployed jobs, the JAR file that contains the UDF classes must be made available. You can do this by using an init container to fetch the JAR file at runtime, or by building a custom Flink image that includes the JAR file.

Event Processing 1.5.2 icon By using an init container

Complete the following steps to use an init container to fetch the UDF JAR file from a remote location at runtime:

  1. Package your UDF as a custom Java class into a JAR file. For more information, see the Flink documentation.

  2. Make the JAR file available at a URL that is accessible from your cluster, for example, by hosting it on a web server.

  3. Modify the FlinkDeployment custom resource to add an init container that fetches the JAR file and mounts it into the Flink containers:

    apiVersion: events.ibm.com/v1beta1
    kind: FlinkDeployment
    metadata:
      name: my-flink-deployment
    spec:
     #...
      podTemplate:
      #...
        spec:
        #...
          initContainers:
          - name: udf-fetcher
            image: registry.access.redhat.com/ubi8/toolbox:latest
            volumeMounts:
              - mountPath: <mount-path>
                name: udf-volume
            command:
              - /bin/sh
              - -c
              - "wget -O <mount-path>/<jar-filename> <url-of-jar>"
          containers:
          - name: flink-main-container
            volumeMounts:
            - mountPath: <mount-path>
              name: udf-volume
          volumes:
          - name: udf-volume
            emptyDir: {}
    

    Where:

    • <mount-path> is the path in the container where the JAR file is mounted, matching the path referenced in your SQL, for example, /udf.
    • <jar-filename> is the name of your UDF JAR file, for example, my-udf.jar.
    • <url-of-jar> is the URL from where the JAR file is downloaded, for example, https://my.web.server/my-udf.jar.
  4. Apply the modified FlinkDeployment custom resource by using the UI or the CLI.

The user-defined functions are now available for use in your deployed Flink jobs.

Alternatively, complete the following steps to build a custom Flink image that includes the UDF JAR file:

  1. Package your UDF as a custom Java class into a JAR file. For more information, see the Flink documentation.

  2. Run the following command to extract the Flink image name (including its SHA digest) from the ClusterServiceVersion (CSV). For example, if you are running Flink version 1.5.2:

    kubectl get deployment \
    -l app.kubernetes.io/name=flink-kubernetes-operator \
    -o jsonpath='{.items[0].spec.template.spec.containers[?(@.name=="flink-kubernetes-operator")].env[?(@.name=="IBM_FLINK_IMAGE")].value}' 
    

    Alternatively, you can obtain the image name from the Flink operator pod’s environment variable:

    kubectl set env pod/<flink_operator_pod_name> --list -n <flink_operator_namespace> | grep IBM_FLINK_IMAGE
    
  3. Create a Dockerfile with a FROM clause to use the IBM Flink image with its SHA digest, as determined in the previous step, and add the UDF JAR file at the same path that is referenced in your SQL.

    FROM --platform=<platform> <IBM Flink image with digest>
    COPY --chown=flink:root <path-of-the-udf-jar> <path-of-the-udf-jar>
    

    Where:

    • <platform> is linux/amd64 or linux/s390x depending on your deployment target.
    • <path-of-the-udf-jar> is the path of the UDF JAR file as referenced in your SQL, for example, /udf/my-udf.jar.
  4. Build the image and push it to a registry accessible from your Kubernetes cluster. If your registry requires authentication, configure the image pull secret. For example, in OpenShift, you can use the global cluster pull secret.

  5. Specify this image in the spec.image field of the FlinkDeployment custom resource:

    spec:
      image: <image built at step 3>
    
  6. Apply the modified FlinkDeployment custom resource by using the UI or the CLI.

The custom Flink image with the UDF JAR file is now deployed, and the user-defined functions are available for use in your deployed Flink jobs.

You can temporarily stop a running Flink job while capturing its current state by creating a savepoint, and allowing you to restart the job from the exact point where it stopped by using the savepoint when required.

  1. Edit the FlinkDeployment custom resource.

  2. Make the following modifications:

    a. Ensure that the value of spec.job.upgradeMode is savepoint.

    b. Ensure that the value of spec.job.state is suspended to stop the Flink job.

    spec:
      job:
        jarURI: local:///opt/flink/ibm-flow/ibm-ep-flow-deployer.jar
        args: []
        state: suspended
        upgradeMode: savepoint
    
  3. Save the changes in the FlinkDeployment custom resource.

  4. The FlinkStateSnapshots custom resource is created by the operator with a name that starts with the name of your FlinkDeployment custom resource, and a savepoint is triggered and written to a location in the PVC, which is indicated in the status.path of FlinkStateSnapshots custom resource.

    For example:

    status:
      failures: 0
      path: 'file:/opt/flink/volume/flink-sp/savepoint-caf2b2-39d09a1c170c'
      state: COMPLETED
    

You can resume a suspended job from the exact point where it stopped by using the savepoint created during its suspension.

  1. Edit the FlinkDeployment custom resource of a Flink job that you suspended earlier:

    a. Set the value of spec.job.upgradeMode is savepoint.

    b. Set the value of spec.job.state is running to resume the Flink job.

    c. Set the value of spec.job.initialSavepointPath to the savepoint location found in the status.path field of the FlinkStateSnapshots custom resource from step 4 of stopping a Flink job with a savepoint.

    For example:

       job:
         jarURI: local:///opt/flink/ibm-flow/ibm-ep-flow-deployer.jar
         args: []
         state: running
         upgradeMode: savepoint
         initialSavepointPath: file:/opt/flink/volume/flink-sp/savepoint-caf2b2-39d09a1c170c
         allowNonRestoredState: true
    
  2. Apply the modified FlinkDeployment custom resource.

For more information on manually restoring a job, see manual recovery.

Enable SSL connection for your database and API server

To securely connect Flink jobs to an API server or a database such as PostgreSQL, MySQL, or Oracle, enable an SSL connection as follows:

  1. Ensure that you have added the CA certificate for your database or API server to the truststore, and created a secret that includes the truststore.

  2. Edit the FlinkDeployment custom resource.

  3. Complete the following modifications:

    • In the spec.flinkConfiguration section, add:

      env.java.opts.taskmanager: >-
         -Djavax.net.ssl.trustStore=/certs/truststore.<keystore-extension>
         -Djavax.net.ssl.trustStorePassword=<chosen password>
      env.java.opts.jobmanager: >-
         -Djavax.net.ssl.trustStore=/certs/truststore.<keystore-extension>
         -Djavax.net.ssl.trustStorePassword=<chosen password>
      

    Where:

    • <keystore-extension> is the extension for your keystore format. For example, jks for Java Keystore and p12 for Public-Key Cryptography Standards.

    • In spec.podTemplate.spec.containers.volumeMounts section, add:

      - mountPath: /certs
        name: truststore
        readOnly: true
      
    • In spec.podTemplate.spec.volumes section, add:

      - name: truststore
        secret:
          items:
            - key: truststore.<keystore-extension>
              path: truststore.<keystore-extension>
          secretName: ssl-truststore
      
  4. Apply the modified FlinkDeployment custom resource:

    kubectl apply -f <custom-resource-file-path>
    

    Where <custom-resource-file-path> is the location of the EventProcessing custom resource YAML file.

    For example:

    kubectl apply -f flinkdeployment_demo.yaml
    

A secure SSL connection is enabled between Flink and your API server or the database.

Troubleshooting

The successful execution of the Flink job when deploying depends on the correctness of the actual connector configuration.

For troubleshooting:

  • Check the logs of the active Flink Job Manager (for errors due to invalid config.yaml file, search for com.ibm.ei.streamproc.model.deploy.imports.ConfigModelException).
  • Errors due to incorrect configuration can also lead to Flink exceptions, which are logged in the Flink task manager pods.