Event Streams producer API

Event Streams provides a REST API to help connect your existing systems to your Event Streams Kafka cluster. Using the API, you can integrate Event Streams with any system that supports RESTful APIs.

The REST producer API is a scalable REST interface for producing messages to Event Streams over a secure HTTP endpoint. Send event data to Event Streams, utilize Kafka technology to handle data feeds, and take advantage of Event Streams features to manage your data.

Use the API to connect existing systems to Event Streams, such as IBM Z mainframe systems with IBM z/OS Connect, systems using IBM DataPower Gateway, and so on.

Event Streams Producer API architecture

About authorization

By default Event Streams requires clients to be authorized to write to topics. The available authentication mechanisms for use with the REST Producer are MutualTLS (tls) and SCRAM SHA 512 (scram-sha-512). For more information about these authentication mechanisms, see the information about managing access.

The REST producer API requires any authentication credentials be provided with each REST call to grant access to the requested topic. This can be done in one of the following ways:

  1. In an HTTP authorization header:

    You can use this method when you have control over what HTTP headers are sent with each call to the REST producer API. For example, this is the case when the API calls are made by code you control.

  2. Mutual TLS authentication (also referred to as SSL client authentication or SSL mutual authentication):

    You can use this method when you cannot control what HTTP headers are sent with each REST call. For example, this is the case when you are using third-party software or systems such as CICS events over HTTP.

Note: You must have Event Streams version 2019.1.1 or later to use the REST API. In addition, you must have Event Streams version 2019.4.1 or later to use the REST API with SSL client authentication.

Content types

The following are supported for the value of the Content-Type header:

  • application/octet-stream
  • text/plain
  • application/json
  • text/xml
  • application/xml

For each content type the message body is copied “as is” into the Kafka record value. Both the application/octet-stream and text/plain types must specify a length header to avoid accidental truncation if the HTTP connection drops prematurely. The payload of a request that uses the application/json header must parse as valid JSON. Otherwise, the request will be rejected.

The Event Streams REST Producer API also supports the following vendor content types:

  • vnd.ibm-event-streams.json.v1 as a synonym for application/json
  • vnd.ibm-event-streams.binary.v1 as a synonym for application/octet-stream
  • vnd.ibm-event-streams.text.v1 as a synonym for text/plain

These content types can be used to pin applications at the version 1 API level.

Prerequisites

To be able to produce to a topic, ensure you have the following available:

  • The URL of the Event Streams REST Producer API endpoint.
  • The topic you want to produce to.
  • If using a REST Producer API endpoint that requires HTTPS, the Event Streams certificate.

To retrieve the full URL for the Event Streams API endpoint, you can use the Kubernetes command-line tool (kubectl) or Event Streams UI.

Using the Kubernetes command-line tool (kubectl):

  1. Log in to your Kubernetes cluster as a cluster administrator by setting your kubectl context.
  2. Run the following command to list available Event Streams REST Producer API endpoints:

    kubectl get eventstreams <instance-name> -n <namespace> -o=jsonpath='{.status.endpoints[?(@.name=="restproducer")].uri}{"\n"}'
    
  3. Copy the full URL of the required endpoint from the HOST/PORT section of the response.

Using the UI:

  1. Log in to the Event Streams UI from a supported web browser (see how to determine the login URL for your Event Streams UI).
  2. Click Connect to this cluster on the right.
  3. Go to the Resources tab.
  4. Scroll down to the Producer endpoint and credentials section.
  5. Click Copy Producer API endpoint.

By default the Event Streams REST Producer API endpoint requires a HTTPS connection. If this has not been disabled for the endpoint the Event Streams certificate must be retrieved. You can use the Event Streams CLI or UI.

Using the CLI:

  1. Ensure you have the Event Streams CLI installed.

  2. Initialize the Event Streams CLI by following the instructions in logging in. If you have more than one Event Streams instance installed, select the one where the topic you want to produce to is.

    Details of your Event Streams installation are displayed.

  3. Download the server certificate for Event Streams:

    kubectl es certificates --format pem
    

    By default, the certificate is written to a file called es-cert.pem.

Using the UI:

  1. Log in to the Event Streams UI from a supported web browser (see how to determine the login URL for your Event Streams UI).
  2. Click Connect to this cluster on the right.
  3. Go to the Resources tab.
  4. Scroll down to the Certificates section.
  5. In the PEM certificate section, click Download certificate.

For information on how to create a topic to produce to, see the information about creating topics.

Key and message size limits

The REST producer API has a configured limit for the key size (default is 4096 bytes) and the message size (default is 65536 bytes). If the request sent has a larger key or message size than the limits set, the request will be rejected.

You can configure the key and message size limits at the time of installation or later as described in modifying installation settings. The limits are configured by setting environment variables on the REST Producer component:

spec:
  restProducer:
    env:
      - name: MAX_KEY_SIZE
        value: "4096"
      - name: MAX_MESSAGE_SIZE
        value: "65536"

Important: Do not set the MAX_MESSAGE_SIZE to a higher value than the maximum message size that can be received by the Kafka broker or the individual topic (max.message.bytes). By default, the maximum message size for Kafka brokers is 1000012 bytes. If the limit is set for an individual topic, then that setting overrides the broker setting. Any message larger than the maximum limit will be rejected by Kafka.

Note: Sending large requests to the REST producer increases latency, as it will take the REST producer longer to process the requests.

Producing messages using REST with HTTP authorization

Ensure you have gathered all the details required to use the producer API, as described in the prerequisites. Before producing you must also create authentication credentials.

To create authentication credentials to use in an HTTP authorization header, you can use the Event Streams CLI or UI.

Using the CLI:

  1. Ensure you have the Event Streams CLI installed.

  2. Initialize the Event Streams CLI by following the instructions in logging in. If you have more than one Event Streams instance installed, select the one where the topic you want to produce to is.

    Details of your Event Streams installation are displayed.

  3. Use the kafka-user-create command to create a KafkaUser that can produce to your topic:

    cloudctl es kafka-user-create --topic <topic_name> --name <user_name> --producer --auth-type scram-sha-512
    

    Note: The Event Streams CLI kafka-user commands can be run only when the Event Streams CLI has access to Kubernetes resources, which can be provided by IAM authentication. If your Event Streams instance uses SCRAM-SHA-512 authentication, you can create a Kafka User by using the OpenShift web console or the Kubernetes CLI. Event Streams 11.2.4 icon You can also use the Event Streams UI to generate the Kafka users.

  4. Follow the steps in managing access to retrieve the SCRAM SHA 512 username and password.

Using the UI:

  1. Log in to the Event Streams UI from a supported web browser (see how to determine the login URL for your Event Streams UI).
  2. Click Connect to this cluster on the right.
  3. Go to the Resources tab.
  4. Scroll down to the Producer endpoint and credentials section, then click Generate credentials.
  5. Select SCRAM username and password, then click Next.
  6. Fill in a Credential Name, this name must be unique.
  7. Select Produce messages, consume messages and create topics and schemas, then click Next.
  8. Select A specific topic and fill in the topic name, then click Next.
  9. Click Next on the consumer group panel, then Generate credentials on the transactional IDs panel using the default settings.
  10. Take a copy of either the username and password or Basic authentication token.

You can use the usual languages for making the API call. For example, to use cURL to produce messages to a topic with the producer API using a Basic authentication header, run the curl command as follows:

curl -v -X POST -H "Authorization: Basic <auth_token>" -H "Content-Type: text/plain" -H "Accept: application/json" -d 'test message' --cacert es-cert.pem "https://<api_endpoint>/topics/<topic_name>/records"

Where:

  • <auth_token> is the Basic authentication token you generated earlier.
  • <api_endpoint> is the full URL copied from the Producer API endpoint field earlier. Use http instead of https if the provided Producer API endpoint has TLS disabled.
  • <topic_name> is the name of the topic you want to produce messages to.
  • --cacert es-cert.pem can be ommitted if the provided Producer API endpoint has TLS disabled

To use cURL to produce messages to a topic with the producer API using a SCRAM username and password, run the curl command as follows:

curl -v -X POST -u <user>:<password> -H "Content-Type: text/plain" -H "Accept: application/json" -d 'test message' --cacert es-cert.pem "https://<api_endpoint>/topics/<topic_name>/records"

Where:

  • <user> is the SCRAM username provided when generating credentials.
  • <password> is the SCRAM password retrieved earlier.
  • <api_endpoint> is the full URL copied from the Producer API endpoint field earlier. Use http instead of https if the provided Producer API endpoint has TLS disabled.
  • <topic_name> is the name of the topic you want to produce messages to.
  • --cacert es-cert.pem can be ommitted if the provided Producer API endpoint has TLS disabled

For full details of the API, see the API reference.

Producing messages using REST with Mutual TLS authentication

Ensure you have gathered all the details required to use the producer API, as described in the prerequisites. Before producing you must also create TLS credentials.

To create authentication credentials to use with Mutual TLS authentication, you can use the Event Streams CLI or UI.

Using the CLI:

  1. Ensure you have the Event Streams CLI installed.
  2. Initialize the Event Streams CLI by following the instructions in logging in. If you have more than one Event Streams instance installed, select the one where the topic you want to produce to is.

    Details of your Event Streams installation are displayed.

  3. Use the kafka-user-create command to create a KafkaUser that can produce to your topic:

    cloudctl es kafka-user-create --topic <topic_name> --name <user_name> --producer --auth-type tls
    

    Note: The Event Streams CLI kafka-user commands can be run only when the Event Streams CLI has access to Kubernetes resources, which can be provided by IAM authentication. If your Event Streams instance uses SCRAM-SHA-512 authentication, you can create a Kafka User by using the OpenShift web console or the Kubernetes CLI. Event Streams 11.2.4 icon You can also use the Event Streams UI to generate the Kafka users.

  4. Follow the steps in managing access to TLS certificates and keys.

Using the UI:

  1. Log in to the Event Streams UI from a supported web browser (see how to determine the login URL for your Event Streams UI).
  2. Click Connect to this cluster on the right.
  3. Go to the Resources tab.
  4. Scroll down to the Producer endpoint and credentials section, then click Generate credentials.
  5. Select Mutual TLS certificate, then click Next.
  6. Fill in a Credential Name, this name must be unique.
  7. Select Produce messages, consume messages and create topics and schemas, then click Next.
  8. Select A specific topic and fill in the topic name, then click Next.
  9. Click Next on the consumer group panel, then Generate credentials on the transactional IDs panel using the default settings.
  10. Click Download certificates and unzip the downloaded ZIP archive containing the TLS certificates and keys.

Each KafkaUser has a related secret that stores all credentials needed for the mutual TLS authentication. These secrets are:

  • ca.crt - the client CA certificate
  • user.key - the client private key
  • user.crt - the client certificate
  • user.password - password for accessing the client private key
  • user.p12 - PKCS #12 archive containing the client certificate and private key

By default, the name of the secret is same as the name of the KafkaUser. For more information about secrets, see the Strimzi documentation.

For some systems, for example CICS, you need to download and import the client CA certificate into your truststore. The client CA certificate can be downloaded using the Kubernetes command-line tool (kubectl) or the OpenShift Container Platform web console.

Using the Kubernetes command-line tool (kubectl):

  1. Log in to the Event Streams UI from a supported web browser (see how to determine the login URL for your Event Streams UI).
  2. Run the following command to view details of the KafkaUser you want the client CA certificate for:

    kubectl get ku/<kafka-user> -o jsonpath='{.status.secret}'
    
  3. Note down the name of the secret associated with the KafkaUser.
  4. Run the following kubectl command to get the client CA certificate from the secret found in the previous command:

    kubectl get secret <KafkaUser-name> -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
    

    Where <KafkaUser-name> is the name of your KafkaUser.

Using the OpenShift web console:

  1. Log in to the OpenShift Container Platform web console using your login credentials.
  2. Expand the Workloads dropdown and select Secrets to open the Secrets dashboard.
  3. Find and click the secret created for your Kafka User.
  4. Go to Data section in the Secret details which lists the available certificates.
  5. Click on Reveal values and get the ca.crt certificate.

If you are using Java keystores, the client certificate can be imported by using the keytool -importcert ... command as described in the IBM SDK, Java Technology Edition documentation.

Some systems require the client certificate and private key to be combined into one PKCS12 file (with extension .p12 or .pfx). A PKCS12 file and associated password file is included in the KafkaUser secret and the ZIP file downloaded from the Event Streams UI.

You can use the usual languages for making the API call. Consult the documentation for your system to understand how to specify the client certificate and private key for the outgoing REST calls to Event Streams. For example, to use cURL to produce messages to a topic with the producer API, run the curl command as follows:

curl -v -X POST -H "Content-Type: text/plain" -H "Accept: application/json" -d 'test message' --cacert es-cert.pem --key user.key --cert user.crt "https://<api_endpoint>/topics/<topic_name>/records"

Where:

  • <api_endpoint> is the full URL copied from the Producer API endpoint field earlier.
  • <topic_name> is the name of the topic you want to produce messages to.
  • es-cert.pem is the Event Streams server certificate downloaded as part of the prerequisites
  • user.key is the private key of the user downloaded from the UI or read from the KafkaUser secret
  • user.crt is the user certificate that contains the public key of the user downloaded from the UI or read from the KafkaUser secret

For example, the steps to configure a CICS URIMAP as an HTTP client is described in the CICS Transaction Server documentation. In this case, load the client certificate and private key, together with the Event Streams server certificate into your RACF key ring. When defining the URIMAP:

  • Host is the client authentication API endpoint obtained as part of the prerequisites, without the leading https://
  • Path is /topics/<topic-name>/records
  • Certificate is the label given to the client certificate when it was loaded into the key ring.

For full details of the API, see the API reference.