Deduplicating repeated events

Note: To follow the step-by-step instructions in this tutorial, you can watch the video or read the instructions on the page.

Scenario

The operations team needs to remove duplicate events from the stock movements topic, for processing by systems that cannot behave idempotently.

Before you begin

The instructions in this tutorial use the Tutorial environment, which includes a selection of topics each with a live stream of events, created to allow you to explore features in IBM Event Automation. Following the setup instructions to deploy the demo environment gives you a complete instance of IBM Event Automation that you can use to follow this tutorial for yourself.

Versions

This tutorial uses the following versions of Event Automation capabilities. Screenshots may differ from the current interface if you are using a newer version.

  • Event Streams 11.3.0
  • Event Endpoint Management 11.1.1
  • Event Processing 1.1.1

Instructions

Step 1 : Discover the source topic to use

For this scenario, you are processing an existing stream of events. You will start by identifying the topic.

  1. Go to the Event Endpoint Management catalog.

    screenshot

    If you need a reminder about how to access the Event Endpoint Management catalog you can review Accessing the tutorial environment.

    If there are no topics in the catalog, you may need to complete the tutorial setup step to populate the catalog.

  2. The STOCK.MOVEMENT topic contains the events used in this tutorial.

    screenshot

    Tip: Notice that the topic information describes the issue that we are addressing in this tutorial. Documenting potential issues and considerations for using topics is essential for enabling effective reuse.

Step 2 : Create a topic for the deduplicated events

The pre-processing job will write the deduplicated events to a different topic, that can be used as a source for the systems that are unable to process idempotently.

The next step is to create this topic.

  1. Go to the Event Streams topics manager.

    screenshot

    If you need a reminder about how to access the Event Streams web UI, you can review Accessing the tutorial environment.

  2. Create a new topic called STOCK.MOVEMENT.UNIQUE.

    screenshot

  3. Create the topic with 3 partitions to match the STOCK.MOVEMENT source topic.

    screenshot

Step 3 : Create a skeleton processing flow

The Event Processing authoring UI makes it easy to start new projects. Before we start writing the deduplication filter query, we can set up the new job by using the low-code UI.

  1. Go to the Event Processing home page.

    screenshot

    If you need a reminder about how to access the Event Processing home page, you can review Accessing the tutorial environment.

  2. Create a flow, and give it a name and description to explain that you will use it to deduplicate the events on the stock movements topic.

  3. Create an Event source node.

    screenshot

    Use the server address information and Generate access credentials button on the topic page in the catalog from Step 1 to configure the event source node.

    Tip: If you need a reminder about how to create an event source node, you can follow the Identify orders from a specific region tutorial.

    Tip: You will need the access credentials that you create here again in Step 4. Saving the credentials JSON from the catalog makes this easier.

  4. Create an Event destination node.

    screenshot

  5. Configure the event destination node by using the internal server address from Event Streams.

    screenshot

    screenshot

  6. Use the username and password for the kafka-demo-apps user for accessing the new topic.

    screenshot

    If you need a reminder of the password for the kafka-demo-apps user, you can review the Accessing Kafka topics section of the Tutorial Setup instructions.

  7. Choose the STOCK.MOVEMENT.UNIQUE topic created in Step 2.

Step 4 : Export and prepare the pre-processing SQL

The skeleton processing flow is now ready to export. The next step is to use it as the basis for writing your deduplication job.

  1. Go to the Event Processing home page.

  2. Use the menu for your deduplication job to select Export.

    screenshot

  3. Choose the SQL export option.

    screenshot

  4. Open the exported SQL file in a text editor.

    Tip: Customizing the SQL exported from Event Processing can enable a wide range of additional processing scenarios.

  5. Edit the SQL to add Event Endpoint Management credentials.

    The properties.sasl.jaas.config attribute in your Stock movements event source will have empty values for username and password. You need to fill these in using the access credentials that you created in the Event Endpoint Management catalog.

    Tip: If you no longer have the password available, you can create a new set of credentials from the catalog for use in your SQL job.

  6. Edit the SQL to add Event Streams credentials.

    The properties.sasl.jaas.config attribute in your Unique stock movements event destination will have empty values for username and password. You need to fill these in using the access credentials from Event Streams.

  7. Insert the following line at the start of the SQL file to give your Flink job a recognizable name.

    SET 'pipeline.name' = 'stock-movements-deduplication';
    

Step 5 : Write the custom SQL step

The outline of your SQL is now ready. The next step is to prepare the deduplication step by adding it to your template SQL.

  1. Find the last line of the SQL. It will look something like this:

    INSERT INTO `Unique stock movements` SELECT * FROM `Stock movements`;
    

    The names will vary depending on what you named the nodes in your skeleton flow.

  2. Modify the SQL to look like this:

    INSERT INTO `Unique stock movements`
         SELECT
             movementid,
             warehouse,
             product,
             quantity,
             updatetime,
             event_time
         FROM
         (
             SELECT *,
                 ROW_NUMBER() OVER (PARTITION BY movementid ORDER BY event_time ASC) AS rownum
             FROM
                 `Stock movements`
         )
         WHERE
             rownum = 1;
    

    Modify Stock movements and Unique stock movements to match the names that you gave your event source and event destination nodes. You can find those names in the two CREATE TABLE commands in the SQL file.

    Tip: You can learn more about deduplication in the Apache Flink documentation if you would like to understand how this works.

Step 6 : Modify the event destination

The final step is to configure the destination where you will be writing the deduplicated events to.

Find the CREATE TABLE definition for the destination topic. It will start with something like this:

CREATE TABLE `Unique stock movements`
(
    `movementid`                   STRING,
    `warehouse`                    STRING,
    `product`                      STRING,
    `quantity`                     BIGINT,
    `updatetime`                   STRING,
    `event_time`                   TIMESTAMP(6)
)

(As before, your table name may be different).

You need to make a few modifications to this definition to prepare it for use by your query. All of the following modifications are to this table definition.

  1. Modify the event_time to look like this:

    CREATE TABLE `Unique stock movements`
    (
        `movementid`                   STRING,
        `warehouse`                    STRING,
        `product`                      STRING,
        `quantity`                     BIGINT,
        `updatetime`                   STRING,
        `event_time`                   TIMESTAMP(6) METADATA FROM 'timestamp'
    )
    

    This will mean that messages produced to STOCK.MOVEMENT.UNIQUE will have the metadata timestamp from the original message, rather than a new timestamp for when the message was produced.

  2. Add a PRIMARY KEY property to the table.

    CREATE TABLE `Unique stock movements`
    (
        `movementid`                   STRING,
        `warehouse`                    STRING,
        `product`                      STRING,
        `quantity`                     BIGINT,
        `updatetime`                   STRING,
        `event_time`                   TIMESTAMP(6) METADATA FROM 'timestamp',
        PRIMARY KEY (`movementid`) NOT ENFORCED
    )
    
  3. Modify the connector name to use upsert (instead of append) mode.

        PRIMARY KEY (`movementid`) NOT ENFORCED
    )
    WITH (
        'connector' = 'upsert-kafka',
        'topic' = 'STOCK.MOVEMENT.UNIQUE',
    
  4. Remove the line with the scan.startup.mode property.

  5. Remove 'format' = 'json' and replace it with the following:

        'key.format' = 'raw',
        'value.format' = 'json',
    

Step 6 : Submit your SQL job

The final step is to submit your finished deduplication job to Flink.

  1. Log in to your Red Hat OpenShift Container Platform as a cluster administrator by using the oc CLI (oc login).

  2. Get the name of the pod for your Flink job manager:

    POD_NAME=$(oc get pods \
        -l component=jobmanager \
        -l app=my-flink \
        -l app.kubernetes.io/instance=ibm-eventautomation-flink \
        -n event-automation \
        -o custom-columns=Name:.metadata.name \
        --no-headers=true)
    
  3. Copy your SQL file to the job manager pod.

    oc cp -n event-automation \
        <your-sql-file.sql> $POD_NAME:/tmp/deduplication.sql
    

    Replace <your-sql-file.sql> with the name of the SQL file that you created.

  4. Run the Flink SQL job

    oc exec -n event-automation $POD_NAME -- \
        /opt/flink/bin/sql-client.sh -hist /dev/null -f /tmp/deduplication.sql
    

    The submitted job continues running, processing and deduplicating new events as they are produced to the STOCK.MOVEMENT topic.

Step 7 : Confirm the results

You can verify the job by examining the destination topic.

  1. View the original events on the STOCK.MOVEMENT topic.

  2. Use the timestamp to identify a duplicate event.

    screenshot

    Approximately one in ten of the events on this topic are duplicated, so looking through ten or so messages should be enough to find an example.

  3. Examine the event with the same timestamp on the STOCK.MOVEMENT.UNIQUE topic.

    screenshot

    You should see that there is only a single event with that timestamp and contents on the destination topic, as the duplicate event was filtered out.

Recap

You have written a custom Flink SQL job to preprocess the events on a Kafka topic. The results are written to a different Kafka topic.

You could use this second topic as the source for an Event Processing flow, or any other Kafka application.