Configuring Flink
Before you begin
Consider the following resources before configuring your FlinkDeployment
custom resource:
- Flink documentation
- Flink configuration options
- Flink Event Time and Watermark
- Kafka SQL connector
- Flink SQL CREATE statement
- Job and scheduling
Configuring Flink checkpointing
When Flink can’t complete writing a checkpoint during the checkpointing time interval, the throughput of Flink jobs decreases. This might happen when Flink jobs experience backpressure, meaning that they can’t process events as fast as they are ingested.
The following options can be used to adjust the way checkpoints can be taken in the checkpointing time interval:
execution.checkpointing.interval
execution.checkpointing.min-pause
In addition, two other Flink capabilities can help reduce the time taken to write checkpoints:
-
Unaligned checkpoints
- Set the deployment option
execution.checkpointing.unaligned.enabled
to true. - Set the deployment option
execution.checkpointing.max-concurrent-checkpoints
to 1. - Set the deployment option
execution.checkpointing.mode
to ‘EXACTLY_ONCE’.
- Set the deployment option
-
Buffer debloating
- Set deployment option
taskmanager.network.memory.buffer-debloat.enabled
totrue
.
- Set deployment option
For more information about checkpointing under backpressure, see the Flink documentation.
For more information about tuning checkpointing, see the Flink documentation.
For more information about checkpointing options, see the Flink documentation.
Configuring RocksDB
RocksDB is an embeddable persistent key-value store for fast storage provided by Flink.
Most of the time, Flink operations such as aggregate, rolling aggregate, and interval join involve a large number of interactions with RocksDB for maintaining states.
If the Flink job experiences a fall in throughput due to backpressure, consider enabling the incremental checkpointing Flink capability by setting the deployment option state.backend.incremental
to true
.
For more information about RocksDB state backend, see the Flink documentation.
You can tune RocksDB operations to optimize the overall performance of your Flink jobs.
A good starting point is to adjust the way RocksDB keeps states in memory and transfers them to disk, using the following deployment options:
state.backend.rocksdb.memory.write-buffer-ratio
(the default value is 0.5).state.backend.rocksdb.memory.high-prio-pool-ratio
(the default value is 0.1).state.backend.rocksdb.checkpoint.transfer.thread.num
(the default value is 4).
For more information about tuning RocksDB, see the Flink documentation.
Configuring parallelism
A Flink program consists of multiple tasks. A task can be split into several parallel instances for execution and each parallel instance processes a subset of the task’s input data. The number of parallel instances of a task is called its parallelism.
In order to increase the throughput of your Flink job, you can perform the following actions:
- Increase the number of partitions in the Kafka ingress topics used by your Flink event sources.
- Set the Flink job parallelism to a value equal to this number of partitions.
For more information, see Deploying jobs for development purposes and Deploying jobs in a production environment.
Configuring persistent storage
Persistent storage is required for Flink to be able to recover from transient failures, to create savepoints and to be restored from savepoints, and for configuring High Availability for Flink Job Managers.
To configure persistent storage, complete the following steps:
-
Deploy the Flink instance by using a
FlinkDeployment
custom resource with persistent storage configured. See the following example for the relevant persistent storage settings in the custom resource:spec: [...] flinkConfiguration: [...] state.checkpoints.dir: 'file:///opt/flink/volume/flink-cp' state.checkpoints.num-retained: '3' state.savepoints.dir: 'file:///opt/flink/volume/flink-sp' podTemplate: apiVersion: v1 kind: Pod metadata: name: pod-template spec: containers: - name: flink-main-container volumeMounts: - name: flink-volume mountPath: /opt/flink/volume volumes: - name: flink-volume persistentVolumeClaim: claimName: ibm-flink-pvc
Note: All the Flink sample deployments are configured with persistent storage, except the Quick Start sample.
Configuring High Availability for Job Manager
-
Configure persistent storage. Persistent storage is required to configure High Availability for the Flink Job Manager.
-
Deploy the Flink instance by using a
FlinkDeployment
custom resource with High Availability configured. See the following example for the relevant High Availability settings in the custom resource:spec: [...] flinkConfiguration: [...] high-availability.type: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: 'file:///opt/flink/volume/flink-ha' podTemplate: apiVersion: v1 kind: Pod metadata: name: pod-template spec: affinity: podAntiAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 80 podAffinityTerm: labelSelector: matchExpressions: - key: type operator: In values: - flink-native-kubernetes topologyKey: kubernetes.io/hostname jobManager: replicas: 2
Note: The Flink samples Production and Production - Flink Application cluster are configured with High Availability for the Flink Job Manager.
Important: To ensure the automatic restart of Flink jobs if the cluster restarts, the Flink instances in session cluster mode must be configured with High Availability for the Flink Job Manager.
Configuring Event Processing
Enabling persistent storage
In order to persist the data input into a Event Processing instance, configure persistent storage in your EventProcessing
configuration.
To enable persistent storage for EventProcessing
, set spec.authoring.storage.type
to persistent-claim
.
Then configure storage in one of the following ways:
- Dynamic provisioning
- Providing persistent volume
- Providing persistent volume and persistent volume claim.
Ensure that you have sufficient disk space for persistent storage.
Note: spec.authoring.storage.type
can also be set to ephemeral
, although no persistence is provisioned with this configuration. This is not recommended for production usage because it results in lost data.
Dynamic provisioning
If there is a dynamic storage provisioner present on the system, Event Processing can use it to dynamically provision the persistence.
To configure this, set spec.authoring.storage.storageClassName
to the name of the storage class provided by the provisioner.
apiVersion: events.ibm.com/v1beta1
kind: EventProcessing
# ...
spec:
license:
# ...
authoring:
storage:
type: persistent-claim
storageClassName: csi-cephfs
# ...
- Optionally, specify the storage size in
storage.size
(for example,"100Gi"
). - Optionally, specify the root storage path where data is stored in
storage.root
(for example,"/opt"
). - Optionally, specify the retention setting for the storage if the instance is deleted in
storage.deleteClaim
(for example,"true"
).
Providing persistent volumes
Before installing Event Processing, you can create a persistent volume for it to use as storage.
To use a persistent volume that you created earlier, set the spec.authoring.storage.selectors
to match the labels on the persistent volume and set the spec.authoring.storage.storageClassName
to match the storageClassName
on the persistent volume.
The following example creates a persistent volume claim to bind to a persistent volume with the label precreated-persistence: my-pv
and storageClassName: manual
.
Multiple labels can be added as selectors, and the persistent volume must have all labels present to match.
apiVersion: events.ibm.com/v1beta1
kind: EventProcessing
# ...
spec:
license:
# ...
authoring:
storage:
type: persistent-claim
selectors:
precreated-persistence: my-pv
storageClassName: manual
# ...
- Optionally, specify the storage size in
storage.size
(for example,"100Gi"
). - Optionally, specify the root storage path where data is stored in
storage.root
(for example,"/opt"
). - Optionally, specify the retention setting for the storage if the instance is deleted in
storage.deleteClaim
(for example,"true"
).
Providing persistent volume and persistent volume claim
A persistent volume and persistent volume claim can be precreated for Event Processing to use as storage.
To use this method set spec.authoring.storage.existingClaimName
to match the name of the precreated persistent volume claim.
apiVersion: events.ibm.com/v1beta1
kind: EventProcessing
# ...
spec:
license:
# ...
authoring:
storage:
type: persistent-claim
existingClaimName:
# ...
Configuring TLS
TLS can be configured for the EventProcessing
instance in one of the following ways:
- Operator configured CA certificate
- User-provided CA certificate
- User-provided certificates
- User-provided UI certificates
Operator configured CA certificate
By default, the operator configures its own TLS.
The operator uses the Cert Manager installed on the system to generate a CA certificate with a self-signed issuer. It then uses this self signed CA certificate to sign the certificates used for secure communication by the Event Processing instance.
Cert manager puts the CA certificate into a secret named <my-instance>-ibm-ep-root-ca
.
The Cert Manager and the Event Processing will create the following objects:
- Cert Manager Issuers:
<my-instance>-ibm-eventprocessing
<my-instance>-ibm-eventprocessing-selfsigned
The following snippet is an example of a configuration that uses a user provided CA certificate:
apiVersion: events.ibm.com/v1beta1
kind: EventProcessing
# ...
spec:
license:
# ...
authoring:
tls:
caSecretName: myCASecret
# ...
User-provided CA certificate
You can provide a custom CA certificate to the Event Processing instance. The operator uses the Cert Manager installed on the system to create the certificates used for secure communication by the Event Processing instance. The certificates are signed by using the provided CA certificate.
The CA secret that is created and referenced in the Cert Manager must contain the keys ca.crt
, tls.crt
, tls.key
. The ca.crt
key and the tls.crt
key can have the same value.
See the following example to use the user provided certificate files (ca.crt
, tls.crt
, and tls.key
):
-
Set a variable for the
NAMESPACE
by running the following command:export NAMESPACE=<instance namespace>
-
Create the CA secret by running the following command:
kubectl create secret generic ca-secret-cert --from-file=ca.crt=ca.crt --from-file=tls.crt=tls.crt --from-file=tls.key=tls.key -n ${NAMESPACE}
-
To provide a custom CA certificate secret, set
spec.authoring.tls.caSecretName
field to be the name of the CA certificate secret that contains the CA certificate.
The following code snippet is an example of a configuration that uses the CA certificate secret that is created in the previous steps:
apiVersion: events.ibm.com/v1beta1
kind: EventProcessing
# ...
spec:
license:
# ...
authoring:
tls:
caSecretName: ca-secret-cert
# ...
Note: The secret that is referenced here must contain the keys ca.crt
, tls.crt
, tls.key
. The ca.crt
key and the tls.crt
key can have the same value.
User-provided certificates
You can use a custom certificate for secure communication by the Event Processing instance. You can use the OpenSSL tool to generate a CA and certificates that are required for an Event Processing instance.
Note: The envsubst
utility is available on Linux and can be installed by default as part of the gettext
package.
See the following example for setting up OpenSSL tool to generate a CA and Certificate required for an Event Processing instance:
-
If you are using a MAC, the following packages are required and can be installed by using
HomeBrew
:- gettext
- openssl@3
brew install gettext openssl@3
Then run
alias openssl=$(brew --prefix)/opt/openssl@3/bin/openssl
to use Openssl3. -
Set the following variables on your workstation:
EMAIL = <email address> INSTANCE_NAME = <my_instance> CLUSTER_API = <cluster api> NAMESPACE = <eventprocessing installation namespace>
Where:
- INSTANCE_NAME is the name of the Event Processing instance
- CLUSTER_API is the cluster URL that can be obtained from the cluster. If the URL is
https://console-openshift-console.apps.clusterapi.com/
then the CLUSTER_API must be set toapps.clusterapi.com
.
-
Create a file called
csr_ca.txt
with the following data:[req] prompt = no default_bits = 4096 default_md = sha256 distinguished_name = dn x509_extensions = usr_cert [dn] C = US ST = New York L = New York O = MyOrg OU = MyOU emailAddress = me@working.me CN = server.example.com [usr_cert] basicConstraints = CA:TRUE subjectKeyIdentifier = hash authorityKeyIdentifier = keyid,issuer
-
Create a file called
my-eventprocessing_answer.txt
with the following data:[req] default_bits = 4096 prompt = no default_md = sha256 x509_extensions = req_ext req_extensions = req_ext distinguished_name = dn [dn] C = US ST = New York L = New York O = MyOrg OU = MyOrgUnit emailAddress = ${EMAIL} CN = ${INSTANCE_NAME}-ibm-eventprocessing-svc [req_ext] subjectAltName = @alt_names [alt_names] DNS.1 = ${INSTANCE_NAME}-ibm-eventprocessing-svc DNS.2 = ${INSTANCE_NAME}-ibm-eventprocessing-svc.{NAMESPACE} DNS.3 = ${INSTANCE_NAME}-ibm-eventprocessing-svc.{NAMESPACE}.svc DNS.4 = ${INSTANCE_NAME}-ibm-eventprocessing-svc.{NAMESPACE}.svc.cluster.local DNS.5 = ${INSTANCE_NAME}-ibm-eventprocessing-rt-{NAMESPACE}.${CLUSTER_API} DNS.7 = ${INSTANCE_NAME}-ibm-eventprocessing-{NAMESPACE}.${CLUSTER_API}
Important: If you are planning to do any of the following for your deployment, ensure you modify the
[alt_names]
section in the previous example to include the Event Processingui
endpoint hostname:- You are planning to specify hostnames in the
eventprocessing
custom resource underspec.authoring.endpoints
. - You are planning to create additional routes or ingress.
- You are not running on OpenShift Container Platform
- You are planning to specify hostnames in the
-
Generate the required certificates by running the following commands:
-
ca.key
:openssl genrsa -out ca.key 4096
-
ca.crt
:openssl req -new -x509 -key ca.key -days 730 -out ca.crt -config <( envsubst <csr_ca.txt )
-
eventprocessing
key:openssl genrsa -out my-eventprocessing.key 4096
-
eventprocessing csr
:openssl req -new -key ${INSTANCE_NAME}.key -out ${INSTANCE_NAME}.csr -config <(envsubst < ${INSTANCE_NAME}_answer.txt )
-
-
Sign the
csr
to create theeventprocessing crt
by running the following command:openssl x509 -req -in ${INSTANCE_NAME}.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out ${INSTANCE_NAME}.crt -days 730 -extensions 'req_ext' -extfile <(envsubst < ${INSTANCE_NAME}_answer.txt)
-
Verify the certificate by running the following command:
openssl verify -CAfile ca.crt ${INSTANCE_NAME}.crt
-
Create Secret on the cluster by running the following command:
Note: The Secret must be added to the namespace that the Event Processing instance is intended to be created in.
kubectl create secret generic ${INSTANCE_NAME}-cert --from-file=ca.crt=ca.crt --from-file=tls.crt=${INSTANCE_NAME}.crt --from-file=tls.key=${INSTANCE_NAME}.key -n ${NAMESPACE}
-
Create an Event Processing instance and set the
spec.authoring.tls.secretName
to the name of the created certificate.apiVersion: events.ibm.com/v1beta1 kind: EventProcessing # ... spec: license: # ... authoring: tls: secretName: my-eventprocessing-cert # ...
User-provided UI certificates
A separate custom certificate can be used for the UI. This certificate is presented to the browser when the Event Processing user interface is navigated.
To supply a custom certificate to the UI set spec.authoring.tls.ui.secretName
to be the name of the secret containing the certificate.
The following snippet is an example of a configuration that uses a user provided certificate:
apiVersion: events.ibm.com/v1beta1
kind: EventProcessing
# ...
spec:
license:
# ...
authoring:
tls:
ui:
secretName: myUiSecret
# ...
If running on the Red Hat OpenShift Container Platform:
- Optionally, specify the key in the secret that is pointing to the CA certificate
ui.caCertificate
(default,ca.crt
). - Optionally, specify the key in the secret that is pointing to the server certificate
ui.serverCertificate
(default,tls.crt
). - Optionally, specify the key in the secret that is pointing to the private key
ui.key
(default,tls.key
).
Deploy NetworkPolicies
By default, the operator will deploy an instance specific NetworkPolicy when an instance of EventProcessing
is created.
The deployment of these network policies can be turned off with the property spec.deployNetworkPolicies
The following snippet is an example of a configuration that turns off the deployment of the NetworkPolicy:
apiVersion: events.ibm.com/v1beta1
kind: EventProcessing
# ...
spec:
license:
# ...
deployNetworkPolicies: false
# ...
For information, see Network Policies.
Configuring ingress
If running on the Red Hat OpenShift Container Platform, routes are automatically configured to provide external access.
You can optionally set a host for each exposed route on EventProcessing
instance by setting values under spec.authoring.endpoints[]
.
If you are not running on the Red Hat OpenShift Container Platform, the Event Processing operator will create ingress resources to provide external access.
No default hostnames will be assigned to the ingress resource, and you must set hostnames for each exposed endpoint on the EventProcessing
instance.
The following code snippet shows how to configure the host for the EventProcessing
UI endpoint:
apiVersion: events.ibm.com/v1beta1
kind: EventProcessing
# ...
spec:
license:
# ...
authoring:
endpoints:
- name: ui
host: my-ep-ui.mycompany.com
#...
Ingress default settings
If you are not running on the Red Hat OpenShift Container Platform, the following ingress defaults are set unless overriden:
-
class
: The ingress class name is set by default tonginx
. Set theclass
field on endpoints to use a different ingress class. -
annotations
: The following annotations are set by default on generated ingress endpoints:
ingress.kubernetes.io/ssl-passthrough: 'true'
nginx.ingress.kubernetes.io/backend-protocol: HTTPS
nginx.ingress.kubernetes.io/ssl-passthrough: 'true'
If you specify a spec.authoring.tls.ui.secretName
, on an EventProcessing
the following re-encrypt annotations will be set on the ui
ingress. Other ingresses will be configured for passthrough.
nginx.ingress.kubernetes.io/backend-protocol: HTTPS
nginx.ingress.kubernetes.io/configuration-snippet: proxy_ssl_name "<HOSTNAME>";
nginx.ingress.kubernetes.io/proxy-ssl-protocols: TLSv1.3
nginx.ingress.kubernetes.io/proxy-ssl-secret: <NAMESPACE>/<SECRETNAME>
nginx.ingress.kubernetes.io/proxy-ssl-verify: 'on'
Ingress annotations can be overridden by specifying an alternative set of annotations on an endpoint. The following code snippet is an example of overriding the annotations set on a ui
endpoint ingress.
apiVersion: events.ibm.com/v1beta1
kind: EventProcessing
# ...
spec:
license:
# ...
endpoints:
- name: ui
host: my-ui.mycompany.com
annotations:
some.annotation.foo: "true"
some.other.annotation: value
# ...
Configuring PostgreSQL SSL in Event Processing and Flink
PostgreSQL offers a built-in functionality to enhance security by using the Secure Sockets Layer (SSL) connections.
If a PostgreSQL is exposing a TLS encrypted endpoint where the certificate is self signed, or has been issued by an internal Certificate Authority (CA), it is necessary to configure Event Processing and Flink instances to enable verification of the endpoint certificate.
To enable SSL connections to PostgreSQL from Event Processing and Flink:
- Add the CA certificate used to issue the certificate presented by a PostgreSQL database to a Java truststore.
- Create a secret with the truststore.
- Mount the secret through Event Processing and the IBM Operator for Apache Flink.
Add the CA certificate to the truststore
- Obtain the CA certificate of the PostgreSQL database from your database administrator.
-
Add the certificate for the CA to the truststore by running the following command:
keytool -keystore ./truststore.<keystore-extension> -alias <cert_name> -import -file ./<path_to_ca_cert> -noprompt -storepass <choose a password> -trustcacerts
Where:
<keystore-extension>
is the extension for your keystore format. For example,jks
for Java Keystore andp12
for Public-Key Cryptography Standards.<cert_name>
is the alias name you want to give to the certificate being imported into the keystore.<path_to_ca_cert>
is the path to the CA certificate file that you want to import into the keystore.
Create a secret with the truststore
- Log in to your Red Hat OpenShift Container Platform as a cluster administrator by using the
oc
CLI (oc login
). -
Ensure you are in the project where your Event Processing instance is installed:
oc project <project_name>
-
Create a secret with the truststore that you added with the following command:
oc create secret generic ssl-truststore --from-file=truststore.<keystore-extension>
Where:
<keystore-extension>
is the extension for your keystore format. For example,jks
for Java Keystore andp12
for Public-Key Cryptography Standards.
Mount the secret
Complete the following steps to mount the secret through Event Processing and the IBM Operator for Apache Flink by using the OpenShift web console:
- Log in to the OpenShift Container Platform web console using your login credentials.
- Expand the Operators dropdown and select Installed Operators to open the Installed Operators page.
- Expand the Project dropdown and select the project the instance is installed in. Click the operator called IBM Event Processing managing the project.
- Select the Event Processing tab and search the Name column for the installed instance and click it.
- Select the YAML tab.
-
Add the following snippet into the Event Processing custom resource (
spec.authoring
):template: pod: spec: containers: - env: - name: JAVA_TOOL_OPTIONS value: >- -Djavax.net.ssl.trustStore=/opt/ibm/sp-backend/certs/truststore.<keystore-extension> -Djavax.net.ssl.trustStorePassword=<chosen password> name: backend volumeMounts: - mountPath: /opt/ibm/sp-backend/certs name: truststore readOnly: true volumes: - name: truststore secret: items: - key: truststore.<keystore-extension> path: truststore.<keystore-extension> secretName: ssl-truststore
- Save your changes and then click Reload.
- Expand the Operators dropdown and select Installed Operators to open the Installed Operators page.
- Expand the Project dropdown and select the project the instance is installed in. Click the operator called IBM Operator for Apache Flink.
- Click the IBM Operator for Apache Flink tab and search the Name column for the installed instance and click the name.
- Select the YAML tab.
-
Add the following snippets into the Flink (
FlinkDeployment
) custom resource:-
In
spec.flinkConfiguration
section, add:env.java.opts.taskmanager: >- -Djavax.net.ssl.trustStore=/certs/truststore.<keystore-extension> -Djavax.net.ssl.trustStorePassword=<chosen password> env.java.opts.jobmanager: >- -Djavax.net.ssl.trustStore=/certs/truststore.<keystore-extension> -Djavax.net.ssl.trustStorePassword=<chosen password>
-
In
spec.podTemplate.spec.containers.volumeMounts
section, add:- mountPath: /certs name: truststore readOnly: true
-
In
spec.podTemplate.spec.volumes
section, add:- name: truststore secret: items: - key: truststore.<keystore-extension> path: truststore.<keystore-extension> secretName: ssl-truststore
-
- Save your changes and then click Reload.
Wait for the Event Processing and the Flink pods to become ready.
The capability to create SSL connections between IBM Operator for Apache Flink, Event Processing, and a secured PostgreSQL database is enabled.