Event Streams helps you integrate Kafka with external systems by setting up and managing Kafka Connect and connectors as custom resources.
Using Kafka Connect
- To start Kafka Connect, define a
KafkaConnectcustom resource and configure it for your environment. AKafkaConnectcustom resource represents a Kafka Connect distributed cluster and contains information about your Kafka cluster, the connectors that you want to use, and how you want to distribute the workload. - Each connector is started and managed by configuring and creating a
KafkaConnectorcustom resource.
Complete the following sections to set up your Kafka Connect environment.
Configure connection to Kafka
Configure the KafkaConnect custom resource with information about your Kafka cluster:
-
authentication: If your Event Streams instance is secured with authentication, configure thespec.authenticationfield ofKafkaConnectcustom resource with credentials, which allow the connectors to read and write to Kafka Connect topics. For more information, see authentication and authorization.-
If your Event Streams instance is secured with SCRAM authentication, specify a username and a secret containing a password, as follows:
authentication: type: scram-sha-512 username: <my-kafka-user-name> passwordSecret: password: password secretName: <my-kafka-user-name>Where
<my-kafka-user-name>is the name of theKafkaUserthat will authenticate with the Event Streams instance. -
If your Event Streams instance is secured with Transport Layer Security (TLS) certificates, specify the required client certificate and key, as follows:
authentication: type: tls certificateAndKey: certificate: user.crt secretName: kafka-connect-user key: user.key
-
-
bootstrapservers: Kafka bootstrap servers to connect to. Use a comma-separated list for<hostname>:<port>pairs. For example:bootstrapServers: 'my-es-kafka-bootstrap-es.hostname:443'To get the
bootstrapServersfor your Event Streams instance, run the following command:kubectl get eventstreams <instance-name> -n <namespace> -o=jsonpath='{.status.kafkaListeners[?(@.name=="<external_listener_name>")].bootstrapServers}{"\n"}'Where
<external_listener_name>is the name of your external Kafka listener. -
tls: If your Event Streams instance endpoints require TLS encrypted connections, configure thespec.tlsfield ofKafkaConnectcustom resource with reference to required certificates for connecting securely to Kafka listeners.tls: trustedCertificates: - secretName: <instance_name>-cluster-ca-cert certificate: ca.crt
Add connectors you want to use
Prepare Kafka Connect for connecting to external systems by adding the required connectors using one of the methods described below:
-
Use the Event Streams operator to add the connectors by specifying the connectors within a
KafkaConnectcustom resource. The Event Streams operator will create a container image with Kafka Connect and all the specified connector artifacts, and use it to start Kafka Connect. -
Manually add required connectors and Kafka Connect to a container image and specify the image in
KafkaConnectcustom resource. The Event Streams operator will use the specified image to start Kafka Connect.
Use the Event Streams operator
The Event Streams operator can create a container image with Kafka Connect and all the specified connector artifacts, tag and push the image to the specified container registry, and use the built image to start Kafka Connect.
To build the connector image by using the Event Streams operator, configure the spec.build section of KafkaConnect custom resource as follows:
-
spec.build.plugins: Specify the connectors you want to use inspec.build.pluginssection.Each connector can be specified as a plug-in with a name and a list of artifacts that represent the connector and any other dependencies you want to use with that connector.
Each artifact has a type, and additional fields that define how the artifact can be downloaded and used. Event Streams supports the following types of artifacts:
- JAR files, which are downloaded and used directly
- TGZ and ZIP archives, which are downloaded and unpacked
- Maven artifacts, which uses Maven coordinates
- Other artifacts, which are downloaded and used directly
If the artifact is of type
jar,tgz,zip, or other, then the plug-in can be defined as follows:plugins: - name: plugin-1 artifacts: - type: jar url: <url> sha512sum: <sha512sum>Where:
typeis the file format for the connector image that you will download.<url>defines the location accessible from your cluster to download the connector from.<sha512sum>is the checksum that you use to verify that the downloaded artifact is correct before it adds it to your Kafka Connect environment.
If the artifact type is
maven, then the plug-in can be defined as follows:plugins: - name: mq-sink artifacts: - type: maven group: <group name> artifact: <artifact name> version: <version>Note: If you encounter issues while retrieving the maven artifacts, consider encoding the configuration values. For example, to retrieve the
com.ibm.mq.allclientartifact, configure your value asartifact: com%2Eibm%2Emq%2Eallclient. -
spec.build.output: Configure this section to specify the output that you want from theKafkaConnectbuild process.For example:
output: image: my-image-registry.my-kafka-connect-image:latest type: docker pushSecret: my-registry-credentialsWhere:
typecan bedockerif you want to create a container image orimagestreamif you are usingOpenShift ImageStream.imageis the full name of the new container image including registry address, port number (if listening to a non-standard port), and tag. For example,my-registry.io/my-connect-cluster-image:my-tagwhere:my-registry.io/my-connect-cluster-imageis the address of the registry.my-tagis the tag of the new container image.
pushSecretif the image registry is secured, you can optionally provide the name of the secret that contains credentials with write permission.
If the build process needs to pull or push images from or to a secured container registry, specify the secret containing the credentials in the spec.template section. For example, to retrieve the Event Streams Kafka Connect image, which is used as the base image for the build, provide your ibm-entitlement-key secret:
-
If running on OpenShift, specify the secret in
spec.template.buildConfig.pullSecretsection:template: buildConfig: pullSecret: ibm-entitlement-key -
If running on other Kubernetes platforms, specify the secret in
spec.template.builPod.imagePullSecretssection:template: buildPod: imagePullSecrets: - name: ibm-entitlement-key -
To provide the secret for pulling any of the images that are used by the specific pod where Kafka Connect is running, specify the secret in
spec.template.pod.imagePullSecretssection:.template: pod: imagePullSecrets: - name: <pull-secret-name>Important: The secrets that are referenced in the
KafkaConnectcustom resource must be present in the same namespace as theKafkaConnectinstance.
Rebuild the Kafka Connect image
Rebuild the Kafka Connect image regularly to ensure that your Kafka Connect environment is up-to-date with changes to Kafka Connect and any new releases of connectors.
When you change the image (spec.image) or the connector plug-in artifacts configuration (spec.build.plugins) in the KafkaConnect custom resource, container image is built automatically.
To pull an upgraded base image or to download the latest connector plug-in artifacts without changing the KafkaConnect resource, you can trigger a rebuild of the container image that is used by your Kafka Connect cluster by applying the following annotation to the StrimziPodSet resource named <kafka-connect-instance-name>-connect:
eventstreams.ibm.com/force-rebuild: true
Manually add required plug-ins
You can create a container image yourself with Kafka Connect and all the required plug-ins (connectors, converters, and transformations), and use it to start Kafka Connect. This approach is useful when the required plug-ins cannot be included by using the operator, for example, when authentication is required to access the plug-in artifacts.
-
The Event Streams Kafka image is a convenient starting point as it includes everything you need to start Kafka Connect. You can prepare a directory containing all the required plug-ins and use the sample
Dockerfileto add them to the Event Streams Kafka image:FROM cp.icr.io/cp/ibm-eventstreams-kafka:<version> COPY ./my-plugins/ /opt/kafka/plugins/ USER 1001Where:
<version>is the version of Event Streams that you are using.my-pluginsis the folder containing all the plug-ins.
Note: Both components must be in a single directory. For example, the following snippet shows the directory structure:
+-- KafkaConnectComponents | +-- Dockerfile | +-- my-plugins -
If your plug-in (connector, converter, or transformation) consists of a single JAR file, you can copy the JAR file directly into the
my-pluginsdirectory. If it consists of multiple JAR files or requires additional dependencies, create a directory for the plug-in inside themy-pluginsdirectory, and copy all the JAR files into that directory. For example, the following snippet shows the directory structure with multiple plug-ins:+-- my-plugins | +-- connector1.jar | +-- connector2 | | +-- connector2.jar | | +-- connector2-lib.jar | +-- transformation2.jar | +-- transformation1 | | +-- transformation1.jar | | +-- transformation1-lib.jar | +-- converter1.jar | +-- converter2 | | +-- converter2.jar | | +-- converter2-lib.jar -
Run the following commands to build the container image and push it to an image registry that is accessible to your Event Streams instance:
docker build -t <registry>/<image>:<tag> docker push <registry>/<image>:<tag>Note: You might need to log in to the IBM Container software library before building the image to allow the base image that is specified in the
Dockerfileto be pulled successfully. -
Specify the image in the
spec.imagefield of theKafkaConnectcustom resource to start Kafka Connect with the image you have built, including all your plug-ins.
Rebuild the Kafka Connect image
Rebuild the Kafka Connect image regularly with a new unique tag and update the KafkaConnect resource to use the new image. This ensures that your Kafka Connect environment is up-to-date with changes to Kafka Connect and any new releases of connectors.
Configure workers
Event Streams runs Kafka Connect in distributed mode, distributing data streaming tasks across one or more worker pods.
-
You can configure the number of workers in the
replicassection of theKafkaConnectresource:replicas: 1 -
Worker configuration can be optionally specified in the
spec.configsection ofKafkaConnectresource. For example:config: group.id: connect-cluster offset.storage.topic: connect-cluster-offsets config.storage.topic: connect-cluster-configs status.storage.topic: connect-cluster-status config.storage.replication.factor: 1 offset.storage.replication.factor: 1 status.storage.replication.factor: 1Where:
group.idis the unique name used by the Kafka Connect cluster to identify the workers in the cluster.offset.storage.topicis the topic that Kafka Connect uses to store source connector offsets.config.storage.topicis the topic that Kafka Connect uses to store connector configurations.status.storage.topicis the topic that Kafka Connect uses to store the status of connectors.
Note: Set the following factors to
1if you have less than three brokers in your Event Streams cluster:config.storage.replication.factoroffset.storage.replication.factorstatus.storage.replication.factor
Starting Kafka Connect
-
Start Kafka Connect by creating the
KafkaConnectcustom resource with the required configuration. For example, if you are using the MQ source and sink connectors, theKafkaConnectcustom resource might be similar to the following YAML:apiVersion: eventstreams.ibm.com/v1beta2 kind: KafkaConnect metadata: name: <name> namespace: <namespace> annotations: eventstreams.ibm.com/use-connector-resources: 'true' labels: backup.eventstreams.ibm.com/component: kafkaconnect spec: authentication: certificateAndKey: certificate: user.crt key: user.key secretName: <kafka-user-name> type: tls bootstrapServers: <kafka-bootstrap-address> build: output: image: <registry>/<name>:<tag> type: docker plugins: - artifacts: - type: jar url: https://github.com/ibm-messaging/kafka-connect-mq-source/releases/download/v2.1.0/kafka-connect-mq-source-2.1.0-jar-with-dependencies.jar name: mq-source - artifacts: - type: jar url: https://github.com/ibm-messaging/kafka-connect-mq-sink/releases/download/v2.2.0/kafka-connect-mq-sink-2.2.0-jar-with-dependencies.jar name: mq-sink template: buildConfig: pullSecret: ibm-entitlement-key pod: imagePullSecrets: - name: <pull-secret-name> metadata: annotations: # These annotations are license-specific. You can adjust them based on your use case. # For Cloud Pak for Integration Production license: eventstreams.production.type: CloudPakForIntegrationProduction productID: 2cba508800504d0abfa48a0e2c4ecbe2 productName: IBM Event Streams productVersion: 11.5.2 productMetric: VIRTUAL_PROCESSOR_CORE productChargedContainers: <add-name-of-kafka-connect-cr>-connect cloudpakId: c8b82d189e7545f0892db9ef2731b90d cloudpakName: IBM Cloud Pak for Integration productCloudpakRatio: '1:1' connectContainer: securityContext: allowPrivilegeEscalation: false capabilities: drop: - ALL privileged: false readOnlyRootFilesystem: true runAsNonRoot: true tls: trustedCertificates: - certificate: ca.crt secretName: <eventstreams-instance-name>-cluster-ca-cert -
Event Streams operator will populate the
statussection of theKafkaConectcustom resource. Use the following command to verify that your Kafka Connect cluster is running and connectors configured are available for use:kubectl describe kafkaconnect my-connect-clusterThe following snippet is an example output for the previous command:
Status: Conditions: Last Transition Time: 2024-06-25T07:56:40.943007974Z Status: True Type: Ready Connector Plugins: Class: com.ibm.eventstreams.connect.mqsource.MQSourceConnector Type: source Version: 1.3.2 Class: com.ibm.eventstreams.connect.mqsink.MQSinkConnector Type: sink Version 1.5.0
Note: If Kafka Connect fails to connect to Kafka with timeout errors, then ensure that all the connection details are correct. If the problem persists, try duplicating the following connection properties in your KafkaConnect custom resource, adding the producer prefix for source connectors, the consumer prefix for sink connectors, or both if both sink and source connectors are in use.
| Connection property | Required for TLS | Required for SCRAM | Source Connector property | Sink Connector Property |
|---|---|---|---|---|
bootstrap.servers |
Yes | Yes | producer.bootstrap.servers |
consumer.bootstrap.server |
ssl.protocol |
Yes | No | producer.ssl.protocol |
consumer.ssl.protocol |
ssl.truststore.location |
Yes | No | producer.ssl.truststore.location |
consumer.ssl.truststore.location |
ssl.truststore.password |
Yes | No | producer.ssl.truststore.password |
consumer.ssl.truststore.password |
ssl.truststore.type |
Yes | No | producer.ssl.truststore.type |
consumer.ssl.truststore.type |
security.protocol |
No | Yes | producer.security.protocol |
consumer.security.protocol |
sasl.mechanism |
No | Yes | producer.sasl.mechanism |
consumer.sasl.mechanism |
sasl.jaas.config |
No | Yes | producer.sasl.jaas.config |
consumer.sasl.jaas.config |
These values can also be set on a per connector level using the producer.override and consumer.override prefixes.
Set up a Kafka connector
Set up a connector by defining a KafkaConnector custom resource with the required connector configuration.
Note: To use KafkaConnector resources for managing each connector rather than using Kafka Connect REST API directly, set eventstreams.ibm.com/use-connector-resources to true in the metadata.annotations section of the KafkaConnect custom resource.
Configure the connector
-
Kafka Connect cluster: Add the label
eventstreams.ibm.com/cluster: <kafka_connect_name>toKafkaConnectorcustom resource to specify the Kafka Connect cluster where the connector must be started. The value of this label must be set to the name of the corresponding Kafka Connect instance. -
class: Specifies the complete class name for starting the connector. For example, to set up a MQ sink connector, the name of the connector class is as follows:spec: class: com.ibm.eventstreams.connect.mqsink.MQSinkConnector -
tasksMax: Specify the maximum number of tasks that must be used to run this connector. -
autorestart: Specify if the Kafka Connect must automatically restart the connector to recover from failures, and optionally specify the maximum number of attempts that must be made before stopping the connector.autoRestart: enabled: true maxRestarts: 10 -
config: Each connector documents the supported configuration options that allow users to specify details about the external system and control the connector behavior during the data transfer. These configuration properties can be specified as a set of key-value pairs in thespec.configsection of theKafkaConnectorcustom resource. For example, if you are trying to connect to a database by using this connector, then the configurations might include parameters such as the database URL, credentials to connect to the database, and table names. See the documentation of your connector to view the full list of supported configuration properties. -
state: Optionally, specify the state you want the connector to be in. Valid states are:- To start or resume the connector:
running(default) - To pause the connector:
paused - To stop the connector:
stopped
- To start or resume the connector:
Start the connector
Start the connector by creating KafkaConnector custom resource with the required configuration. For example, if you are using the MQ source connector, the KafkaConnector custom resource might be similar to the following snippet:
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnector
metadata:
name: mq-source
labels:
# The eventstreams.ibm.com/cluster label identifies the KafkaConnect instance
# in which to create this connector. That KafkaConnect instance
# must have the eventstreams.ibm.com/use-connector-resources annotation
# set to true.
eventstreams.ibm.com/cluster: <kafka_connect_name>
backup.eventstreams.ibm.com/component: kafkaconnector
spec:
class: com.ibm.eventstreams.connect.mqsource.MQSourceConnector
tasksMax: 1
config:
topic: TSOURCE
mq.queue.manager: QM1
mq.connection.name.list: localhost(1414)
mq.channel.name: MYSVRCONN
mq.queue: MYQSOURCE
mq.user.name: alice
mq.password: passw0rd
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
mq.record.builder: com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder
Event Streams operator will populate the status section of the KafkaConnector resource. Use the following command to verify that your connector is running as expected:
kubectl describe kafkaconnector <connector_name>
This command provides the complete description of the connector that you created. You can verify the current status of the connector from the status section. For example:
Status:
Conditions:
Last Transition Time: 2024-07-13T07:56:40.943007974Z
Status: True
Type: Ready
Connector Status:
Connector:
State: RUNNING
worker_id: mq-connectors-connect-0.mq-connectors-connect-0.es.svc:8083
Name: mq-sink
Tasks:
Id: 0
State: RUNNING
worker_id: mq-connectors-connect-0.mq-connectors-connect-0.es.svc:8083
Type: sink
Observerd Generation: 1
Tasks Max: 1
Enabling topic creation for connectors
To enable connectors to dynamically create topics in Kafka when they do not already exist, you must configure specific settings in both Kafka Connect and the connector. This enables Kafka Connect to create topics that are required by connectors without manual intervention or automatic topic creation in the Kafka brokers.
To enable topic creation, follow these steps:
-
In the
KafkaConnectcustom resource, ensure thattopic.creation.enableis set totrueto enable topic creation (the default setting istrue):topic.creation.enable: trueWhen set to
true, Kafka Connect will create topics that are required by the connector. -
In the
KafkaConnectorcustom resource, provide default topic settings by adding the following configurations:topic.creation.default.partitions: 3 topic.creation.default.replication.factor: 1Where:
topic.creation.default.partitions: Specifies the default number of partitions for newly created topics.topic.creation.default.replication.factor: Specifies the replication factor for newly created topics.
Important: You must specify the default number of partitions and the replication factor for newly created topics. If these values are not provided, Kafka Connect will not create topics, causing connectors that require new topics to fail with errors similar to the following example:
Error while fetching metadata with correlation id x, UNKNOWN_TOPIC_OR_PARTITIONFor more information about setting partitions and replicas, see managing topics.