You can use the MQ sink connector to copy data from IBM Event Streams or Apache Kafka into IBM MQ. The connector copies messages from a Kafka topic into a target MQ queue.
This document contains steps for running the connector in standalone mode for development and test purposes.
The Kafka Connect sink connector for IBM MQ is supported in IBM Event Streams 2018.3.1 and later.
The connector runs inside the Kafka Connect runtime, which is part of the Apache Kafka distribution. IBM Event Streams does not run connectors as part of its deployment, so you need an Apache Kafka distribution to get the Kafka Connect runtime environment.
Ensure you have the following available:
- IBM MQ v8 or later installed. Note: These instructions are for IBM MQ v9 running on Linux. If you’re using a different version or platform, you might have to adjust some steps slightly.
- The Kafka Connect runtime environment that comes as part of an Apache Kafka distribution. These instructions are for Apache Kafka 2.0.0 or later.
Downloading the connector
You can obtain the Kafka Connect sink connector for IBM MQ as follows:
- Log in to your IBM Event Streams UI.
- Click the Toolbox tab, and click Kafka Connect sink connector for IBM MQ.
- Download both the
connector JARand the
sample connector propertiesfiles from the page.
Setting up the queue manager
These sample instructions set up an IBM MQ queue manager that uses its local operating system to authenticate the user ID and password. The user ID and password you provide must already be created on the operating system where IBM MQ is running.
- Log in as a user authorized to administer IBM MQ, and ensure the MQ commands are on the path.
- Create a queue manager with a TCP/IP listener on port 1414:
crtmqm -p 1414 <queue_manager_name>for example to create a queue manager called
crtmqm -p 1414 QM1
- Start the queue manager:
- Start the
runmqsctool to configure the queue manager:
runmqsc, create a server-connection channel:
DEFINE CHANNEL(<channel_name>) CHLTYPE(SVRCONN)
- Set the channel authentication rules to accept connections requiring userid and password:
SET CHLAUTH(<channel_name>) TYPE(BLOCKUSER) USERLIST('nobody')
SET CHLAUTH('*') TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(NOACCESS)
SET CHLAUTH(<channel_name>) TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(CHANNEL) CHCKCLNT(REQUIRED)
- Set the identity of the client connections based on the supplied context (the user ID):
ALTER AUTHINFO(SYSTEM.DEFAULT.AUTHINFO.IDPWOS) AUTHTYPE(IDPWOS) ADOPTCTX(YES)
- Refresh the connection authentication information:
REFRESH SECURITY TYPE(CONNAUTH)
- Create a queue for the Kafka Connect connector to use:
- Authorize the IBM MQ user ID to connect to and inquire the queue manager:
SET AUTHREC OBJTYPE(QMGR) PRINCIPAL('<user_id>') AUTHADD(CONNECT,INQ)
- Authorize the IBM MQ user ID to use the queue:
SET AUTHREC PROFILE(<queue_name>) OBJTYPE(QUEUE) PRINCIPAL('<user_id>') AUTHADD(ALLMQI)
- Stop the
runmqsctool by typing
For example, for a queue manager called
QM1, with user ID
alice, creating a server-connection channel called
MYSVRCONN and a queue called
MYQSINK, you run the following commands in
DEFINE CHANNEL(MYSVRCONN) CHLTYPE(SVRCONN) SET CHLAUTH(MYSVRCONN) TYPE(BLOCKUSER) USERLIST('nobody') SET CHLAUTH('*') TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(NOACCESS) SET CHLAUTH(MYSVRCONN) TYPE(ADDRESSMAP) ADDRESS('*') USERSRC(CHANNEL) CHCKCLNT(REQUIRED) ALTER AUTHINFO(SYSTEM.DEFAULT.AUTHINFO.IDPWOS) AUTHTYPE(IDPWOS) ADOPTCTX(YES) REFRESH SECURITY TYPE(CONNAUTH) DEFINE QLOCAL(MYQSINK) SET AUTHREC OBJTYPE(QMGR) PRINCIPAL('alice') AUTHADD(CONNECT,INQ) SET AUTHREC PROFILE(MYQSINK) OBJTYPE(QUEUE) PRINCIPAL('alice') AUTHADD(ALLMQI) END
The queue manager is now ready to accept connection from the connector and get messages from a queue.
Setting up Apache Kafka
To send messages from IBM Event Streams to IBM MQ, create a topic and obtain security information for your Event Streams installation. You then use this information later to configure the connection to your Event Streams instance.
You can also send IBM MQ messages to Apache Kafka running locally on your machine, see the public GitHub repository for more details.
- Log in to your IBM Event Streams UI.
- Click the Topics tab.
- If you have not previously created the topic to use with the connector, create it now by clicking Create topic.
- Select the topic in the list of topics.
- Click Connect to this topic on the right.
- On the Connect a client tab, copy the address from the Bootstrap server section. This gives the bootstrap address for Kafka clients.
- From the Certificates section, download the server certificate from the Java truststore section, and choose a location for the downloaded file that can be accessed by the Kafka Connect worker.
- Go to the API key section and follow the instructions to generate an API key authorized to connect to the cluster, and to produce to and consume from the topic.
Note: For the distributed worker, the API key will also need to be able to write to the Kafka Connect framework’s internal topics.
Configuring the connector to connect to MQ
The connector requires details to connect to IBM MQ and to your IBM Event Streams or Apache Kafka cluster.
To provide connection details for IBM MQ, use the sample connector properties file you downloaded (
mq-sink.properties). Create a copy of it and save it to the location where you have the connector JAR file.
The connector connects to IBM MQ using a client connection. You must provide the following connection information for your queue manager:
- Comma-separated list of Kafka topics to pull events from.
- The name of the IBM MQ queue manager.
- The connection name (one or more host and port pairs).
- The channel name.
- The name of the sink IBM MQ queue.
- The user name and password if the queue manager is configured to require them for client connections.
mq.queue.manager=QM1 mq.connection.name.list=localhost(1414) mq.channel.name=MYSVRCONN mq.queue=MYQSINK mq.user.name=alice mq.password=passw0rd topics=TSINK
See the sample properties file for a full list of properties you can configure, and also see the GitHub README for all available configuration options.
Configuring the connector to connect to IBM Event Streams or Apache Kafka
To provide the connection details for your Kafka cluster, the Kafka distribution includes a file called
connect-standalone.properties. Edit the file to include the following connection information:
- A list of one or more Kafka brokers for bootstrapping connections.
- Whether the cluster requires connections to use SSL/TLS.
- Authentication credentials if the cluster requires clients to authenticate.
To connect to IBM Event Streams, you will need the broker URL and security details you collected earlier when you configured IBM Event Streams.
The following example shows the required properties for the Kafka Connect standalone properties file:
bootstrap.servers=<broker_url> security.protocol=SASL_SSL ssl.protocol=TLSv1.2 ssl.truststore.location=<certs.jks_file_location> ssl.truststore.password=<truststore_password> sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="<api_key>"; consumer.security.protocol=SASL_SSL consumer.ssl.protocol=TLSv1.2 consumer.ssl.truststore.location=<certs.jks_file_location> consumer.ssl.truststore.password=<truststore_password> consumer.sasl.mechanism=PLAIN consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="<api_key>";
<broker_url> with your cluster’s broker URL,
<certs.jks_file_location> with the path of your downloaded truststore file,
<api_key> with the API key.
Note: If you are running Apache Kafka locally you can use the default
Generate a producer application
To test the connector you will need an application to produce events to your topic.
- Log in to your IBM Event Streams UI.
- Click the Toolbox tab.
- Click Generate application under Starter application
- Enter a name for the application
- Select only Produce messages
- Select Choose existing topic and choose the topic you provided in the MQ connector configuration
- Click Generate
- Once the application has been generated, click Download and follow the instructions in the UI to get the application running
Running the connector
- Open a terminal window and change to the Kafka root directory. Start the connector worker as follows, replacing the
CLASSPATH=<path-to-connector-jar>/kafka-connect-mq-sink-<jar-version>-jar-with-dependencies.jar bin/connect-standalone.sh config/connect-standalone.properties <path-to-mq-properties>/mq-sink.properties
The log output will include the following messages that indicate the connector worker has started and successfully connected to IBM MQ:
INFO Created connector mq-sink INFO Connection to MQ established
- Navigate to the UI of the sample application you generated earlier and start producing messages to IBM Event Streams.
- Use the
amqsgetsample to get messages from the MQ Queue:
/opt/mqm/samp/bin/amqsget <queue_name> <queue_manager_name>
After a short delay, the messages are printed.
For more details about the connector and to see all configuration options, see the GitHub README.