Processing IBM MQ messages

Scenario

IBM MQ queues and topics are a valuable source of events for processing. In this tutorial, you will see how MQ messages can be surfaced on Kafka topics, from where they can be used as a source of events for Event Processing.

Before you begin

The instructions in this tutorial use the Tutorial environment, which includes a selection of topics each with a live stream of events, created to allow you to explore features in IBM Event Automation. Following the setup instructions to deploy the demo environment gives you a complete instance of IBM Event Automation that you can use to follow this tutorial for yourself.

You will also need to run the optional instructions for deploying an MQ queue manager. In addition to setting up the MQ queue manager, it will also start the Kafka Connect connector to flow messages from MQ into Kafka.

Tip: The MQ Connector is just one of many connectors available for bringing data into Kafka. Connectors are an effective way to enable processing events from a wide variety of systems and technologies.

Versions

This tutorial uses the following versions of Event Automation capabilities. Screenshots can differ from the current interface if you are using a newer version.

  • Event Streams 11.5.0
  • Event Processing 1.2.0
  • MQ 2.4.0

Instructions

Step 1 : Discover the MQ queue

Messages in this scenario will start life on an MQ queue called COMMANDS. Start by accessing the queue in the MQ Console.

  1. Go to the MQ web console.

    You can get the URL for the web console from the queuemanager-ibm-mq-web route, and the password from the platform-auth-idp-credentials secret.

    If you have oc CLI access to your Red Hat OpenShift Container Platform cluster, you can use the following commands:

    # URL
    oc get route \
        queuemanager-ibm-mq-web \
        -n event-automation \
        -o jsonpath='https://{.spec.host}'
    
    # password for 'admin' user
    oc get secret \
        platform-auth-idp-credentials \
        -n ibm-common-services \
        -o jsonpath='{.data.admin_password}' | base64 -d
    
  2. Navigate to the COMMANDS queue.

    screenshot

Step 2 : Verify the MQ connector

The next step is to verify that MQ messages are surfaced on the Kafka topic as a stream of events.

  1. Go to the Event Streams topics list, and find the MQ.COMMANDS topic.

    screenshot

    If you need a reminder of how to access the Event Streams web UI, you can review Accessing the tutorial environment.

  2. Use the Create button in the MQ Console to PUT a JSON message to the MQ queue.

    {
        "id": "cbeecfcb-27da-4e59-bbcd-8a974fe22917",
        "customer": {
            "id": "79df63d7-7522-4972-8546-1f1c33531e44",
            "name": "Lelia Langworth"
        },
        "creditcard": {
            "number": "5532169298805994",
            "expiry": "04/25"
        },
        "product": {
            "description": "L Denim Ripped Jeans",
            "price": 45.48
        },
        "order": {
            "quantity": "2"
        },
        "ordertime": "2023-06-28 22:38:35.089"
    }
    

    screenshot

  3. Use the Event Streams topic page to verify that the message appears as an event on the Kafka topic.

    screenshot

  4. Confirm that the message remains available on the COMMANDS queue.

    screenshot

    The use of a streaming queue means that a copy of messages can be made available for transferring to Kafka without disrupting any existing MQ application that is getting the messages.

Step 3 : Flattening the MQ messages

To process the messages in Event Processing, you first need to flatten the nested JSON payloads.

  1. Open the specification for the Kafka MQ Source Connector in a text editor.

    screenshot

    It is the install/supporting-demo-resources/mq/templates/06-connector.yaml file, included in the folder for the files you used to set up the MQ queue manager for this tutorial.

  2. Add a transform definition to flatten the message value.

    screenshot

    transforms.flatten.type: org.apache.kafka.connect.transforms.Flatten$Value
    transforms.flatten.delimiter: "_"
    

    Add this definition to the .spec.config section of the connector definition.

    In the screenshot it is added to the end of the config, but the order is not significant, as long as it is within .spec.config.

  3. Add your new transform to the list of transformations that are to be applied to messages.

    screenshot

    transforms: flatten
    

    Add this line to the .spec.config section of the connector definition.

  4. Apply your changes to the connector definition.

    oc apply -n event-automation \
        -f install/supporting-demo-resources/mq/templates/06-connector.yaml
    

    You need to be logged in to run this command.

    Log in to your Red Hat OpenShift Container Platform as a cluster administrator by using the oc CLI (oc login).

  5. Test the transform by putting a new test message to the COMMANDS queue in the MQ queue manager.

    screenshot

    {
        "id": "6446ef47-79a1-4b8a-a441-a58de8a90188",
        "customer": {
            "id": "a20533e6-88ee-478e-b42f-7a1a028b0b12",
            "name": "Roseanna Cremin"
        },
        "creditcard": {
            "number": "5532144297701443",
            "expiry": "09/24"
        },
        "product": {
            "description": "XS Acid-washed Low-rise Jeans",
            "price": 33.88
        },
        "order": {
            "quantity": "1"
        },
        "ordertime": "2023-07-01 10:51:48.125"
    }
    
  6. Verify that the transform is working by checking the Event Streams topic page.

    screenshot

    You should see the message that is produced to the Kafka topic:

    {
        "product_price": 33.88,
        "product_description": "XS Acid-washed Low-rise Jeans",
        "id": "6446ef47-79a1-4b8a-a441-a58de8a90188",
        "ordertime": "2023-07-01 10:51:48.125",
        "creditcard_number": "5532144297701443",
        "creditcard_expiry": "09/24",
        "customer_name": "Roseanna Cremin",
        "customer_id": "a20533e6-88ee-478e-b42f-7a1a028b0b12",
        "order_quantity": "1"
    }
    

    Changes to connector specifications can sometimes take a moment to apply. If the message produced to the Kafka topic is not flattened, try waiting for 30 seconds, and then put the MQ message again.

This topic is now ready for use by Event Processing. Before trying that, we will add some additional transformations to see what is possible.

Step 4 : Transforming the MQ messages

The flatten transformation that you have applied is one of a wide range of transformations available.

In this step, you will apply a few more of these to see what transformations are possible:

  • Redact the credit card number from the events
  • Remove the customer name from the events
  • Insert a static property to identify where the event came from
  • Cast the quantity property from a string to an integer

Other available transformations are described in the Kafka Connect documentation.

  1. Add the following transformation definitions to the Connector specification.

    screenshot

    transforms.redact.type: org.apache.kafka.connect.transforms.MaskField$Value
    transforms.redact.fields: creditcard_number
    transforms.redact.replacement: XXXXXXXXXXXXXXXX
    
    transforms.drop.type: org.apache.kafka.connect.transforms.ReplaceField$Value
    transforms.drop.blacklist: customer_name
    
    transforms.origin.type: org.apache.kafka.connect.transforms.InsertField$Value
    transforms.origin.static.field: origin
    transforms.origin.static.value: mq-connector
    
    transforms.casts.type: org.apache.kafka.connect.transforms.Cast$Value
    transforms.casts.spec: order_quantity:int16
    
  2. Add your new transforms to the list of transformations that are to be applied to messages.

    screenshot

    transforms: flatten,redact,drop,origin,casts
    

    Tip: The order of these in the comma-separated list is the order that the transformations are applied in.

    In this case, it is important that the flatten transformation is applied first. This is because properties referred to in the later transformations (for example, creditcard_number and customer_name) do not exist until after the flatten transformation is complete.

  3. Apply your changes to the connector definition.

    oc apply -n event-automation \
        -f install/supporting-demo-resources/mq/templates/06-connector.yaml
    
  4. Test the transform by putting a new test message to the COMMANDS queue in the MQ queue manager.

    {
        "id": "d83bb1f5-933a-4251-b29b-0a1ec7d4e56e",
        "customer": {
            "id": "bd02c5f3-3246-4701-9c0b-159c7a7334b0",
            "name": "Fernanda Hermiston"
        },
        "creditcard": {
            "number": "5226589295805765",
            "expiry": "01/24"
        },
        "product": {
            "description": "L White Jogger Jeans",
            "price": 42.88
        },
        "order": {
            "quantity": "2"
        },
        "ordertime": "2023-07-01 11:10:48.124"
    }
    
  5. Verify that the transform is working by checking the Event Streams topic page.

    screenshot

    You should see the message that is produced to the Kafka topic:

    {
        "origin": "mq-connector",
        "order_quantity": 2,
        "product_price": 42.88,
        "id": "d83bb1f5-933a-4251-b29b-0a1ec7d4e56e",
        "creditcard_expiry": "01/24",
        "product_description": "L White Jogger Jeans",
        "ordertime": "2023-07-01 11:10:48.124",
        "customer_id": "bd02c5f3-3246-4701-9c0b-159c7a7334b0",
        "creditcard_number": "XXXXXXXXXXXXXXXX"
    }
    

    Notice that:

    • The message contains an origin property, which was inserted by the connector
    • The creditcard_number property has been masked out with X characters
    • The customer_name property has been removed
    • The string order_quantity property has been cast to an integer

This is now ready for use by Event Processing.

Step 5 : MQ messages as a source of events

The next step is to create an event source in Event Processing based on the source of events from the MQ queue.

  1. Go to the Event Processing home page.

    screenshot

    If you need a reminder of how to access the Event Processing home page, you can review Accessing the tutorial environment.

  2. Create a flow, and give it a name and description to explain that you will use it to process events originating from MQ.

  3. Update the Event source node.

    screenshot

    Hover over the node and click Edit icon Edit.

  4. Add a new event source.

    screenshot

  5. Put the Kafka listener address from Event Streams into the server address for the event source node.

    screenshot

    screenshot

  6. In the Access credentials pane, use the username and password for the kafka-demo-apps user for accessing the new topic.

    screenshot

    If you need a reminder of the password for the kafka-demo-apps user, you can review the Accessing Kafka topics section of the Tutorial Setup instructions.

  7. Select the MQ.COMMANDS topic, and click Next.

    screenshot

  8. The format JSON is auto-selected in the Message format drop-down and the sample message is auto-populated in the JSON sample message field.

    screenshot

  9. In the Key and headers pane, click Next.

    screenshot

    Note: The key and headers are displayed automatically if they are available in the selected topic message.

  10. In the Event details pane, enter the node name as Commands in the Node name field.

    screenshot

  11. Verify that the ordertime property in the message contents has been automatically detected as a timestamp.

    screenshot

  12. Identify the ordertime property as the timestamp to use for events.

    screenshot

    In the Event time options, choose ordertime as the source of event time.

    This means that any delay introduced by the connector transferring the message from MQ to the Kafka topic will not impact any time-based processing you perform.

    The timestamp in the message payload will be treated as the canonical timestamp for the message, rather than when it was produced to the Kafka topic.

  13. Click Configure to finalize the event source.

Step 6 : Aggregate the events

  1. Create an Aggregate node.

    screenshot

    Create an aggregate node by dragging one onto the canvas. You can find this in the Processors section of the left panel.

    Click and drag from the small gray dot on the event source to the matching dot on the aggregate node.

    Did you know? Instead of dragging the node, you can add a node onto the canvas and automatically connect it to the last added node by double-clicking a node within the palette. For example, after configuring an event source node, double-click any processor node to add and connect the processor node to your previously configured event source node.

  2. Hover over the aggregate node and click Edit icon Edit to configure the node.

    Name the aggregate node Order quantities.

    screenshot

  3. Aggregate the order events in 1-minute windows.

    screenshot

    Making the window very small is useful for the tutorial as it means you will see results quickly.

  4. Sum the order_quantity properties.

    screenshot

    Select SUM as the aggregate function, and order_quantity as the property to aggregate.

    This configures the aggregate node to add up the quantity in each of the order events, emitting a total for all of the events in each 1-minute window.

  5. Rename the output to display the total number of ordered items in the time window, and the start and end time for the window.

    screenshot

  6. Click Configure to finalize the aggregate.

Step 7 : Test the flow

The final step is to run your event processing flow and view the results.

  1. Go to the Run menu, and select Events from now to run your flow on the new messages you are about to put to the MQ queue.

    screenshot

  2. Click the aggregate node to see a live view of results from your flow. It will be updated as new events are emitted onto the commands topic.

  3. Put these JSON messages to the MQ queue by using the MQ console.

    {
        "id": "37169553-1b97-49c2-b16d-924257f4e888",
        "customer": {
            "id": "3ba7c289-6d02-40fd-9925-837d9573d5f6",
            "name": "Darin Becker"
        },
        "creditcard": {
            "number": "5434297065054394",
            "expiry": "12/29"
        },
        "product": {
            "description": "XXS Blue Crochet Jeans",
            "price": 30.59
        },
        "order": {
            "quantity": "2"
        },
        "ordertime": "2023-07-01 11:26:18.124"
    }
    
    {
        "id": "87193fcd-0ca5-437f-b377-d2beb7e2fca4",
        "customer": {
            "id": "0c6767e4-3eee-4016-91a8-1c0b1699076e",
            "name": "Sharie Nolan"
        },
        "creditcard": {
           "number": "5300726992175816",
            "expiry": "07/23"
        },
        "product": {
            "description": "XL Blue Skinny Jeans",
            "price": 40.99
        },
        "order": {
            "quantity": "3"
        },
        "ordertime": "2023-07-01 11:31:18.124"
    }
    
    {
        "id": "6e6885f0-8655-4ba3-bb1f-6834a7d5059c",
        "customer": {
            "id": "003f8d4e-5488-4923-beb7-0846463e2b54",
            "name": "Brandi Lubowitz"
        },
        "creditcard": {
            "number": "5505432597412091",
            "expiry": "07/26"
        },
        "product": {
            "description": "L Retro Flare Jeans",
            "price": 28.99
        },
        "order": {
            "quantity": "1"
        },
        "ordertime": "2023-07-01 11:33:48.125"
    }
    
  4. Verify that the results are displayed in the Event Processing flow.

    screenshot

  5. When you have finished reviewing the results, you can stop this flow.

Recap

Connectors enable you to bring streams of events from a wide variety of external systems into Kafka topics, from where you can analyze them by using Event Processing.

Transformations are a helpful way to prepare the events to be in a format suitable for use with Event Processing.