Support is provided by IBM
To use the Oracle (Debezium) source connector, complete the following steps:
-
Ensure that an Oracle database is set up for use with the LogMiner adapter.
-
Create a
KafkaConnectcustom resource to define your Kafka Connect runtime and include the Oracle (Debezium) source connector by following the instructions in setting up and running connectors:When adding the connector to your Kafka Connect runtime, click Get connector to obtain the pre-built connector JAR file, or click Source code if you want to build the connector JAR file yourself.
To obtain the connector JAR file, go to the connector Maven repository, open the directory for the latest version, and download the file ending with
-plugin.tar.gz. -
Apply the configured
KafkaConnectcustom resource to start the Kafka Connect runtime and verify that the connector is available for use.Note: For the Oracle (Debezium) source connector to function correctly in environments where ACLs are enabled, the Kafka user configured for Kafka Connect must be granted cluster-level configuration access. When creating the Kafka user, ensure that the following operations are included under
spec.authorization.acls:- resource: type: cluster operations: - DescribeConfigs - Describe -
Create a
KafkaConnectorcustom resource to define your connector configuration:Specify the class name and configuration properties for the connector in the
KafkaConnectorcustom resource as described in the connector documentation.
If the Kafka cluster uses SCRAM-SHA-512 authentication, add the following configuration properties to your KafkaConnector custom resource under spec.config :
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnector
metadata:
name: <connector_name>
labels:
eventstreams.ibm.com/cluster: <kafka_connect_name>
spec:
class: io.debezium.connector.oracle.OracleConnector
config:
database.server.name: <name_of_the_oracle_server_or_cluster>
plugin.name: pgoutput
database.hostname: <ip_address_or_hostname_of_the_oracle_database_server>
database.dbname: <database_name>
database.user: <database_user_name>
database.password: <database_user_password>
database.port: <port_number_for_database_server>
schema.history.internal.kafka.topic: <name_of_the_kafka_topic>
schema.history.internal.kafka.bootstrap.servers: <bootstrap_server_address>
# Schema-history producer (DDL writer)
schema.history.internal.producer.security.protocol: SASL_SSL
schema.history.internal.producer.sasl.mechanism: SCRAM-SHA-512
schema.history.internal.producer.sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="${file:/opt/kafka/connect-password/cp-kafka-user:password}";
schema.history.internal.producer.ssl.truststore.location: /opt/kafka/connect-certs/<event_streams_name_cluster-ca-cert>/ca.p12
schema.history.internal.producer.ssl.truststore.password: /opt/kafka/connect-certs/<event_streams_name_cluster-ca-cert>/ca.password
schema.history.internal.producer.ssl.truststore.type: PKCS12
# Consumer (DDL reader on startup/recovery)
schema.history.internal.consumer.security.protocol: SASL_SSL
schema.history.internal.consumer.sasl.mechanism: SCRAM-SHA-512
schema.history.internal.consumer.sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="${file:/opt/kafka/connect-password/cp-kafka-user:password}";
schema.history.internal.consumer.ssl.truststore.location: /opt/kafka/connect-certs/<event_streams_name_cluster-ca-cert>/ca.p12
schema.history.internal.consumer.ssl.truststore.password: /opt/kafka/connect-certs/<event_streams_name_cluster-ca-cert>/ca.password
schema.history.internal.consumer.ssl.truststore.type: PKCS12
tasksMax: 1
If the Kafka cluster uses mutual TLS authentication, add the following configuration properties to yourKafkaConnector custom resource under spec.config:
# Schema-history producer (DDL writer)
schema.history.internal.producer.security.protocol: SSL
schema.history.internal.producer.ssl.keystore.location: /opt/kafka/connect-certs/<kafka-user>/user.p12
schema.history.internal.producer.ssl.keystore.password: '${file:/opt/kafka/connect-certs/<kafka-user>:user.password}'
schema.history.internal.producer.ssl.keystore.type: PKCS12
schema.history.internal.producer.ssl.truststore.location: /opt/kafka/connect-certs/<event_streams_name_cluster-ca-cert>/ca.p12
schema.history.internal.producer.ssl.truststore.password: /opt/kafka/connect-certs/<event_streams_name_cluster-ca-cert>/ca.password
schema.history.internal.producer.ssl.truststore.type: PKCS12
# Schema-history consumer (DDL reader)
schema.history.internal.consumer.security.protocol: SSL
schema.history.internal.consumer.ssl.keystore.location: /opt/kafka/connect-certs/<kafka-user>/user.p12
schema.history.internal.consumer.ssl.keystore.password: '${file:/opt/kafka/connect-certs/<kafka-user>:user.password}'
schema.history.internal.consumer.ssl.keystore.type: PKCS12
schema.history.internal.consumer.ssl.truststore.location: /opt/kafka/connect-certs/<event_streams_name_cluster-ca-cert>/ca.p12
schema.history.internal.consumer.ssl.truststore.password: /opt/kafka/connect-certs/<event_streams_name_cluster-ca-cert>/ca.password
schema.history.internal.consumer.ssl.truststore.type: PKCS12
- Apply the configured
KafkaConnectorcustom resource to start the connector and verify that it is running.