You can use the MQ sink connector to copy data from Event Streams or Apache Kafka into IBM MQ. The connector copies messages from a Kafka topic into a target MQ queue.
Kafka Connect can be run in standalone or distributed mode. This document contains steps for running the connector in distributed mode on a Kubernetes platform. In this mode, work balancing is automatic, scaling is dynamic, and tasks and data are fault-tolerant. For more details on the difference between standalone and distributed mode see the explanation of Kafka Connect workers.
Prerequisites
To follow these instructions, ensure you have IBM MQ v8 or later installed.
Note: These instructions are for IBM MQ v9 running on Linux. If you are using a different version or platform, you might have to adjust some steps slightly.
Setting up the queue manager
You can set up a queue manager by using the local operating system to authenticate, or by using the IBM MQ Operator.
By using local operating system to authenticate
These sample instructions set up an IBM MQ queue manager that uses its local operating system to authenticate the user ID and password. The user ID and password you provide must already be created on the operating system where IBM MQ is running.
- Log in as a user authorized to administer IBM MQ, and ensure the MQ commands are on the path.
- 
    Create a queue manager with a TCP/IP listener on port 1414: crtmqm -p 1414 <queue_manager_name>For example, to create a queue manager called QM1, use:crtmqm -p 1414 QM1
- Start the queue manager:
strmqm <queue_manager_name>
- Start the runmqsctool to configure the queue manager:runmqsc <queue_manager_name>
- In runmqsc, create a server-connection channel:DEFINE CHANNEL(<channel_name>) CHLTYPE(SVRCONN)
- Set the channel authentication rules to accept connections requiring userid and password:
    - SET CHLAUTH(<channel_name>) TYPE(BLOCKUSER) USERLIST('nobody')
- SET CHLAUTH('*') TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(NOACCESS)
- SET CHLAUTH(<channel_name>) TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(CHANNEL) CHCKCLNT(REQUIRED)
 
- Set the identity of the client connections based on the supplied context (the user ID):
ALTER AUTHINFO(SYSTEM.DEFAULT.AUTHINFO.IDPWOS) AUTHTYPE(IDPWOS) ADOPTCTX(YES)
- Refresh the connection authentication information:
REFRESH SECURITY TYPE(CONNAUTH)
- Create a queue for the Kafka Connect connector to use:
DEFINE QLOCAL(<queue_name>)
- Authorize the IBM MQ user ID to connect to and inquire the queue manager:
   SET AUTHREC OBJTYPE(QMGR) PRINCIPAL('<user_id>') AUTHADD(CONNECT,INQ)
- Authorize the IBM MQ user ID to use the queue:
   SET AUTHREC PROFILE(<queue_name>) OBJTYPE(QUEUE) PRINCIPAL('<user_id>') AUTHADD(ALLMQI)
- Stop the runmqsctool by typingEND.
For example, for a queue manager called QM1, with user ID alice, creating a server-connection channel called MYSVRCONN and a queue called MYQSINK, you run the following commands in runmqsc:
DEFINE CHANNEL(MYSVRCONN) CHLTYPE(SVRCONN)
SET CHLAUTH(MYSVRCONN) TYPE(BLOCKUSER) USERLIST('nobody')
SET CHLAUTH('*') TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(NOACCESS)
SET CHLAUTH(MYSVRCONN) TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(CHANNEL) CHCKCLNT(REQUIRED)
ALTER AUTHINFO(SYSTEM.DEFAULT.AUTHINFO.IDPWOS) AUTHTYPE(IDPWOS) ADOPTCTX(YES)
REFRESH SECURITY TYPE(CONNAUTH)
DEFINE QLOCAL(MYQSINK)
SET AUTHREC OBJTYPE(QMGR) PRINCIPAL('alice') AUTHADD(CONNECT,INQ)
SET AUTHREC PROFILE(MYQSINK) OBJTYPE(QUEUE) PRINCIPAL('alice') AUTHADD(ALLMQI)
END
The queue manager is now ready to accept connection from the connector and put messages on a queue.
By using the IBM MQ Operator
You can also use IBM MQ Operator to set up a queue manager. For more information about installing the IBM MQ Operator and setting up a queue manager, see the IBM MQ documentation.
If you are using IBM MQ Operator to set up a queue manager, you can use the following yaml to create a queue manager with the required configuration:
- 
    Create a file called custom-sink-mqsc-configmap.yamland copy the following YAML content to create the ConfigMap that has the details for creates a server-connection channel calledMYSVRCONNand a queue calledMYQSINK:apiVersion: v1 kind: ConfigMap metadata: name: custom-sink-mqsc data: sink.mqsc: | DEFINE CHANNEL(MYSVRCONN) CHLTYPE(SVRCONN) SET CHLAUTH(MYSVRCONN) TYPE(BLOCKUSER) USERLIST('nobody') SET CHLAUTH('*') TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(NOACCESS) SET CHLAUTH(MYSVRCONN) TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(CHANNEL) CHCKCLNT(REQUIRED) ALTER AUTHINFO(SYSTEM.DEFAULT.AUTHINFO.IDPWOS) AUTHTYPE(IDPWOS) ADOPTCTX(YES) REFRESH SECURITY TYPE(CONNAUTH) DEFINE QLOCAL(MYQSINK) SET AUTHREC OBJTYPE(QMGR) PRINCIPAL('alice') AUTHADD(CONNECT,INQ) SET AUTHREC PROFILE(MYQSINK) OBJTYPE(QUEUE) PRINCIPAL('alice') AUTHADD(ALLMQI)
- 
    Create the ConfigMap by using the following command: oc apply -f custom-sink-mqsc-configmap.yaml
- 
    To create a queue manager with the required configuration, update the spec.queueManagersection of theQueueManagercustom resource YAML file:# ... queueManager: # ... mqsc: - configMap: name: custom-sink-mqsc items: - sink.mqsc
The queue manager is now ready to accept connection from the connector and put messages on a queue.
Configuring Kafka Connect
Set up your Kafka Connect environment as described in setting up connectors:
- 
    The MQ sink connector JAR file is available as a GitHub release along with all the dependencies that are required to run the connector. To start Kafka Connect with the MQ sink connector, follow the guidance in setting up connectors to add the connector JAR file by using the Event Streams operator or manually. 
- 
    Start the Kafka Connect by using the KafkaConnectcustom resource. You can use the following sampleKafkaConnectcustom resource to get started:
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnect
metadata:
  name: mq-sink-connector
  namespace: es
  annotations:
    eventstreams.ibm.com/use-connector-resources: true  
  labels:
    backup.eventstreams.ibm.com/component: kafkaconnect
spec:
  authentication:
    certificateAndKey:
      certificate: user.crt
      key: user.key
      secretName: my-kafka-user
    type: tls
  bootstrapServers: mtls-listener.my-cluster:443  
  build:
    output:
      image: my-image-registry.my-kafka-connect-image:latest  
      type: docker
    plugins:
      - artifacts:
          - type: jar
            url: https://github.com/ibm-messaging/kafka-connect-mq-sink/releases/download/v2.2.0/kafka-connect-mq-sink-2.2.0-jar-with-dependencies.jar 
        name: mq-sink
  template:
    buildConfig:
      pullSecret: ibm-entitlement-key
    pod:
      imagePullSecrets:
        - name: default-dockercfg-abcde
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: kubernetes.io/arch
                    operator: In
                    values:
                      - amd64
                      - s390x
                      - ppc64le
    connectContainer:
      securityContext:
        allowPrivilegeEscalation: false
        capabilities:
          drop:
            - ALL
        privileged: false
        readOnlyRootFilesystem: true
        runAsNonRoot: true
  tls:
    trustedCertificates:
      - certificate: ca.crt
        secretName: <eventstreams-instance>-cluster-ca-cert
Configuring the connector to connect to MQ
To connect to IBM MQ and to your Event Streams or Apache Kafka cluster, the connector requires configuration settings added to a KafkaConnector custom resource that represents the connector.
For IBM MQ connectors, you can generate the KafkaConnector custom resource YAML file from either the Event Streams UI or the CLI. You can also use the CLI to generate a JSON file, which you can use in distributed mode where you supply the connector configuration through REST API calls.
The connector connects to IBM MQ using a client connection. You must provide the following connection information for your queue manager (these configuration settings are added to the spec.config section of the KafkaConnector custom resource YAML):
- Comma-separated list of Kafka topics to pull events from.
- The name of the IBM MQ queue manager.
- The connection name (one or more host and port pairs).
- The channel name.
- The name of the sink IBM MQ queue.
- The user name and password if the queue manager is configured to require them for client connections.
Using the UI
Use the Event Streams UI to generate and download the KafkaConnector custom resource YAML file for your IBM MQ sink connector.
- Log in to the Event Streams UI from a supported web browser (see how to determine the login URL for your Event Streams UI).
- Click Toolbox in the primary navigation, and scroll to the Connectors section.
- Go to the Set Up IBM MQ Connector tile, and click Connecting to IBM MQ?.
- Ensure the MQ Sink tab is selected, and click Generate.
- In the dialog, enter the configuration of the MQ Sinkconnector.
- Click Download to generate and download the configuration file with the supplied fields.
- Open the downloaded configuration file and change the values of mq.user.nameandmq.passwordto the username and password that you used to configure your instance of MQ. Also set the labeleventstreams.ibm.com/clusterto the name of your Kafka Connect instance.
Using the CLI
Use the Event Streams CLI to generate and download the KafkaConnector custom resource YAML file for your IBM MQ sink connector. You can also use the CLI to generate a JSON file for distributed mode.
- 
    Initialize the Event Streams CLI by following the instructions in logging in. 
- 
    Run the connector-config-mq-sinkcommand to generate the configuration file for theMQ Sinkconnector.For example, to generate a configuration file for an instance of MQwith the following information: a queue manager calledQM1, with a connection point oflocalhost(1414), a channel name ofMYSVRCONN, a queue ofMYQSINKand connecting to the topicsTSINK, run the following command:kubectl es connector-config-mq-sink --mq-queue-manager="QM1" --mq-connection-name-list="localhost(1414)" --mq-channel="MYSVRCONN" --mq-queue="MYQSINK" --topics="TSINK" --file="mq-sink" --format yamlNote: Omitting the --format yamlflag will generate amq-sink.propertiesfile which can be used for standalone mode. Specifying--format jsonwill generate amq-sink.jsonfile which can be used for distributed mode outside OpenShift Container Platform.
- 
    Change the values of mq.user.nameandmq.passwordto the username and password that you used to configure your instance of MQ. Also set the labeleventstreams.ibm.com/clusterto the name of your Kafka Connect instance.
The final configuration file will resemble the following:
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnector
metadata:
  name: mq-sink
  labels:
    # The 'eventstreams.ibm.com/cluster' label identifies the KafkaConnect instance in which to create this connector.
    # That KafkaConnect instance must have the 'eventstreams.ibm.com/use-connector-resources' annotation set to 'true'.
    eventstreams.ibm.com/cluster: <kafka_connect_name>
    backup.eventstreams.ibm.com/component: kafkaconnector
spec:
  class: com.ibm.eventstreams.connect.mqsink.MQSinkConnector
  tasksMax: 1
  config:
    topics: TSINK
    mq.queue.manager: QM1
    mq.connection.name.list: localhost(1414)
    mq.channel.name: MYSVRCONN
    mq.queue: MYQSINK
    mq.user.name: alice
    mq.password: passw0rd
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    mq.message.builder: com.ibm.eventstreams.connect.mqsink.builders.DefaultMessageBuilder
Run the following command to find a list of all the possible flags: kubectl es connector-config-mq-sink --help. 
For all available configuration options for IBM MQ sink connector, see connecting to IBM MQ.
Verifying the log output
Verify the log output of Kafka Connect includes the following messages that indicate the connector task has started and successfully connected to IBM MQ:
$ kubectl logs <kafka_connect_pod_name>
...
INFO Created connector mq-sink
...
INFO Connection to MQ established
...
Send a test message
To test the connector you will need an application to produce events to your topic.
- Log in to the Event Streams UI from a supported web browser (see how to determine the login URL for your Event Streams UI).
- Click Toolbox in the primary navigation.
- Go to the Starter application tile under Applications, and click Get started.
- Click Download JAR from GitHUb. Download the JAR file from the list of assets for the latest release.
- Click Generate properties.
- Enter a name for the application.
- Go to the Existing topic tab and select the topic you provided in the MQ connector configuration.
- Click Generate and download .zip.
- Follow the instructions in the UI to get the application running.
Verify the message is on the queue:
- Navigate to the UI of the sample application you generated earlier and start producing messages to Event Streams.
- 
    Use the amqsgetsample to get messages from the MQ queue:/opt/mqm/samp/bin/amqsget <queue_name> <queue_manager_name>After a short delay, the messages are printed. 
Exactly-once message delivery semantics in IBM MQ sink connector
The IBM MQ sink connector offers exactly-once message delivery semantics. An additional IBM MQ queue is used to store the state of message deliveries. When exactly-once delivery is enabled, Kafka messages are delivered to IBM MQ with no duplicated messages.
Follow the instructions to enable exactly-once delivery in the IBM MQ sink connector.
Note:
- Exactly-once support for IBM MQ connectors is only available in distributed mode; standalone Kafka Connect workers cannot provide exactly-once delivery semantics.
- Enabling exactly-once delivery in the IBM MQ connector results in extra interactions with IBM MQ and Event Streams, which reduces the throughput.
Prerequisites
Ensure the following values are set in your environment before you enable the exactly-once behavior:
- Configure the consumer group of the sink connector to ignore records in aborted transactions. You can find detailed instructions in the Kafka documentation. Notably, this configuration does not have any additional Access Control List (ACL) requirements.
- The IBM MQ sink connector is only supported on Kafka Connect version 2.6.0 or later.
- On the server-connection channel (SVRCONN) used for Kafka Connect, set HBINTto 30 seconds to allow IBM MQ transaction rollbacks to occur more quickly in failure scenarios.
- On the state queue (the queue where the state messages are stored), set DEFSOPTtoEXCLto ensure the state queue share option is exclusive.
- Ensure that the messages sent through the MQ sink connector do not expire and that all the messages on the state queue are persistent.
Enabling exactly-once delivery
Configure the following properties to enable exactly-once delivery:
- 
    The IBM MQ sink connector must be configured with the mq.exactly.once.state.queueproperty set to the name of a pre-configured IBM MQ queue on the same queue manager as the sink IBM MQ queue.
- 
    Only a single connector task can be run. As a consequence, the tasks.maxproperty must be left unset, or set to1.
- 
    Ensure that the state queue is empty each time exactly-once delivery is enabled (especially when re-enabling the exactly-once feature). Otherwise, the connector behaves in the same way as when recovering from a failure state, and attempts to get undelivered messages recorded in the out-of-date state message. 
After enabling exactly-once delivery, Kafka messages are delivered to IBM MQ with no duplicated messages.
Creating a state queue in IBM MQ by using the runmqsc tool
A state message is the message stored in the state queue, and contains the last offset information. A state queue is a queue in IBM MQ that stores the last offset of the message transferred from Kafka to MQ. The last offset information is used to resume the transfer in case of a failure. When a failure occurs, the connector collects the information and continues to transfer messages from the point where the connector is interrupted.
Create a state queue as follows.
Important: Each connector instance must have its own state queue.
- 
    Start the runmqsctool by running the following command:runmqsc <QMGR_NAME>Replace <QMGR_NAME>with the name of the queue manager you want to work with.
- 
    Enter the following command to create a queue: DEFINE QLOCAL (<STATE_QUEUE_NAME>) DEFSOPT (EXCL)Where: - <STATE_QUEUE_NAME>is the name of the state queue.
- Set DEFSOPTtoEXCLto ensure the state queue share option is exclusive.
 Wait for the queue to be created. 
- 
    Stop the runmqsctool by entering the commandEND.
Note: If the sink connector is delivering messages to an IBM MQ for z/OS shared queue, then for performance reasons, the state queue should be placed on the same coupling facility structure.
Enabling MQMD in IBM MQ sink connector
You can configure the IBM MQ sink connector to add values to the MQ message descriptor (MQMD). The MQMD is used to add additional control information to accompany the message data before sending to MQ.
Prerequisites
Adding values to the MQMD is only supported in the IBM MQ sink connector, which is only supported on Kafka Connect version 2.6.0 or later.
Enabling MQMD
Configure the following properties in the KafkaConnector custom resource to enable adding values to the MQMD:
- 
    Set the mq.message.mqmd.writeproperty totruein the IBM MQ sink connector. By default, it is set tofalse.
- 
    Configure the mq.message.mqmd.contextproperty according to the message context. Options include:- ALL, which corresponds to- WMQ_MDCTX_SET_ALL_CONTEXT
- IDENTITY, mapped to- WMQ_MDCTX_SET_IDENTITY_CONTEXT
 Important: If your message contains any of the following properties, you must ensure that WMQ_MQMD_MESSAGE_CONTEXTis set to eitherWMQ_MDCTX_SET_IDENTITY_CONTEXTorWMQ_MDCTX_SET_ALL_CONTEXT:- JMS_IBM_MQMD_UserIdentifier
- JMS_IBM_MQMD_AccountingToken
- JMS_IBM_MQMD_ApplIdentityData
 Similarly, if your message includes any of the following properties, set the WMQ_MQMD_MESSAGE_CONTEXTfield toWMQ_MDCTX_SET_ALL_CONTEXT:- JMS_IBM_MQMD_PutApplType
- JMS_IBM_MQMD_PutApplName
- JMS_IBM_MQMD_PutDate
- JMS_IBM_MQMD_PutTime
- JMS_IBM_MQMD_ApplOriginData
 Other message properties do not require the mq.message.mqmd.contextproperty.
Creating a custom message builder
The MQ sink connector uses the default message builder to write messages to MQ. You can also create a custom message builder to tailor message properties based on your specific requirements.
For example, complete the following steps to create a custom message builder that overrides values of MQMD fields such as JMS_IBM_MQMD_USERIDENTIFIER, JMS_IBM_MQMD_APPLIDENTITYDATA, and JMS_IBM_MQMD_PERSISTENCE for outgoing messages:
- 
    Create a Java class called CustomMessageDescriptorBuilderthat extends theDefaultMessageBuilderprovided by the MQ sink connector. TheCustomMessageDescriptorBuilderclass is the custom message builder.// CustomMessageDescriptorBuilder.java import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.Message; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import com.ibm.eventstreams.connect.mqsink.builders.DefaultMessageBuilder; import com.ibm.msg.client.jms.JmsConstants; public class CustomMessageDescriptorBuilder extends DefaultMessageBuilder { @Override public Message getJMSMessage(final JMSContext jmsCtxt, final SinkRecord record) { final Message message = super.getJMSMessage(jmsCtxt, record); try { // Override properties message.setStringProperty(JmsConstants.JMS_IBM_MQMD_USERIDENTIFIER, "Sample User"); message.setStringProperty(JmsConstants.JMS_IBM_MQMD_APPLIDENTITYDATA, String.format("{}-{}-{}", record.topic(), record.kafkaOffset(), record.kafkaPartition())); // Set persistence based on message content if (message.getBody(String.class).startsWith("PRIORITY")) { message.setIntProperty(JmsConstants.JMS_IBM_MQMD_PERSISTENCE, MQConstants.MQPER_PERSISTENT); } else { message.setIntProperty(JmsConstants.JMS_IBM_MQMD_PERSISTENCE, MQConstants.MQPER_NOT_PERSISTENT); } } catch (final JMSException e) { throw new ConnectException("Failed to write property", e); } return message; } }
- 
    In the getJMSMessage()method, override the values of MQMD properties based on your requirements. In the earlier example,JMS_IBM_MQMD_USERIDENTIFIERandJMS_IBM_MQMD_APPLIDENTITYDATAproperties are set with sample values, and theJMS_IBM_MQMD_PERSISTENCEproperty is set based on the message content.
- 
    Package your custom message builder into a JAR file along with any dependencies it might have. Ensure that this JAR file is placed in a folder alongside the connector JAR file: mq-sink-connector/ |-- ibm-mq-sink-connector.jar |-- custom-message-builder.jar
- 
    Configure the IBM MQ sink connector to use your custom message builder class. Specify the class name of your custom message builder in the connector configuration: mq.message.builder=<name-of-custom-message-builder> mq.message.body.jms=true mq.message.mqmd.write=true mq.message.mqmd.context=ALL # ...Where <name-of-custom-message-builder>is the name of your custom message builder.