Connect client applications to your Event Streams Kafka cluster over HTTP by making HTTP requests.
Overview
With Kafka Bridge, you can connect client applications to your Event Streams Kafka cluster over HTTP, providing a standard web API connection to Event Streams rather than the custom Kafka protocol.
Apache Kafka uses a custom protocol on top of TCP/IP for communication between applications and the Kafka cluster. With Kafka Bridge, clients can communicate with your Event Streams Kafka cluster over the HTTP/1.1 protocol. You can manage consumers and send and receive records over HTTP.
Prerequisites
Ensure you enable Kafka Bridge for Event Streams as described in configuring.
Security
Authentication and encryption between client applications and Kafka Bridge is not supported. Requests sent from clients to Kafka Bridge are not encrypted and must use HTTP (not HTTPS), and are sent without authentication.
You can configure TLS (tls
) or SASL-based (scram-sha-512
) user authentication between Kafka Bridge and your Event Streams Kafka cluster.
To configure authentication between Kafka Bridge and your Kafka cluster, use the KafkaBridge
custom resource.
- The following example includes an Event Streams cluster that requires TLS authentication for user access, and the user
<username>
was created for Kafka Bridge earlier. In addition, it includes TLS authentication for the connection between the Event Streams instance (calleddevelopment
) and the Kafka Bridge.
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaBridge
metadata:
name: my-bridge
namespace: <namespace>
labels:
eventstreams.ibm.com/cluster: <cluster-name>
backup.eventstreams.ibm.com/component: kafkabridge
spec:
replicas: 1
bootstrapServers: '<cluster-name>-kafka-bootstrap:9093'
http:
port: 8080
authentication:
type: tls
certificateAndKey:
certificate: user.crt
key: user.key
secretName: <username>
tls:
trustedCertificates:
- certificate: ca.crt
secretName: <cluster-name>-cluster-ca-cert
template:
bridgeContainer:
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
privileged: false
readOnlyRootFilesystem: true
runAsNonRoot: true
pod:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/arch
operator: In
values:
- amd64
- s390x
- ppc64le
- The following example includes an Event Streams cluster that requires SCRAM-SHA-512 authentication for user access, and the user
<username>
was created for Kafka Bridge earlier. In addition, it includes TLS authentication for the connection between the Event Streams instance (calleddevelopment
) and the Kafka Bridge.
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaBridge
metadata:
name: my-bridge
namespace: <namespace>
labels:
eventstreams.ibm.com/cluster: <cluster-name>
backup.eventstreams.ibm.com/component: kafkabridge
spec:
replicas: 1
bootstrapServers: '<cluster-name>-kafka-bootstrap:9093'
http:
port: 8080
authentication:
type: scram-sha-512
username: <username>
passwordSecret:
secretName: <username>
password: password
tls:
trustedCertificates:
- certificate: ca.crt
secretName: <cluster-name>-cluster-ca-cert
template:
bridgeContainer:
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
privileged: false
readOnlyRootFilesystem: true
runAsNonRoot: true
pod:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/arch
operator: In
values:
- amd64
- s390x
- ppc64le
Client access
Internal access
Client applications running within the same Kubernetes cluster as Kafka Bridge can access Kafka Bridge on the port configured in the KafkaBridge
custom resource (the default is 8080
).
External access
Client applications running outside of the Kubernetes cluster can access the Kafka Bridge through an ingress or an OpenShift route, as described in enabling Kafka Bridge.
Using Kafka Bridge
After enabling and configuring the Kafka Bridge, use the bridge to connect applications to your Kafka cluster over HTTP.
Available HTTP requests
With Kafka Bridge, you can make the following HTTP requests to your Kafka cluster:
- Send messages to a topic.
- Retrieve messages from topics.
- Retrieve a list of partitions for a topic.
- Create and delete consumers.
- Subscribe consumers to topics, so that they start receiving messages from those topics.
- Retrieve a list of topics that a consumer is subscribed to.
- Unsubscribe consumers from topics.
- Assign partitions to consumers.
- Commit a list of consumer offsets.
- Seek on a partition, so that a consumer starts receiving messages from the first or last offset position, or a given offset position.
The related request operations are the following:
GET /
POST /consumers/{groupid}
DELETE /consumers/{groupid}/instances/{name}
POST /consumers/{groupid}/instances/{name}/assignments
POST /consumers/{groupid}/instances/{name}/offsets
POST /consumers/{groupid}/instances/{name}/positions
POST /consumers/{groupid}/instances/{name}/positions/beginning
POST /consumers/{groupid}/instances/{name}/positions/end
GET /consumers/{groupid}/instances/{name}/records
POST /consumers/{groupid}/instances/{name}/subscription
GET /consumers/{groupid}/instances/{name}/subscription
DELETE /consumers/{groupid}/instances/{name}/subscription
GET /healthy
GET /openapi
GET /ready
GET /topics
POST /topics/{topicname}
GET /topics/{topicname}
GET /topics/{topicname}/partitions
POST /topics/{topicname}/partitions/{partitionid}
GET /topics/{topicname}/partitions/{partitionid}
GET /topics/{topicname}/partitions/{partitionid}/offsets
For more detailed information about the request paths, see the Strimzi documentation.
Content types
The following are supported for the value of the Content-Type
header.
- For consumer operations,
POST
requests must provide the followingContent-Type
header:Content-Type: application/vnd.kafka.v2+json
-
For producer operations,
POST
requests must provideContent-Type
headers specifying the embedded data format of the messages produced (JSON or binary:Embedded data format Content-Type
headerJSON Content-Type: application/vnd.kafka.json.v2+json
Binary Content-Type: application/vnd.kafka.binary.v2+json
Producing
After setting up Kafka Bridge, you can produce to specified topics over HTTP. To produce to a topic, create a topic, and run the following curl command.
Note: If automatic topic creation is enabled (set by auto.create.topics.enable
in the broker configuration, default is true
), then you can specify a new topic in the following command and it will be created before messages are written to the topic.
curl -X POST \
http://<route-name>.apps.<cluster-name>.<namespace>.com/topics/<topic-name> \
-H 'content-type: application/vnd.kafka.json.v2+json' \
-d '{
"records": [
{
"key": "key-1",
"value": "value-1"
},
{
"key": "key-2",
"value": "value-2"
}
]
}'
For example, to produce two messages about temperature readings to a topic called my-topic
, you can use the following command:
curl -X POST \
http://my-bridge-route-es-kafka-bridge.apps.example.com/topics/my-topic \
-H 'content-type: application/vnd.kafka.json.v2+json' \
-d '{
"records": [
{
"key": "temperature-1",
"value": "20"
},
{
"key": "temperature-2",
"value": "25"
}
]
}'
If the request to produce to the topic is successful, the Kafka Bridge returns an HTTP status code 200 OK
and a JSON payload describing for each message the partition that the message was sent to and the offset the messages are written to.
# ...
{
"offsets":[
{
"partition":0,
"offset":0
},
{
"partition":2,
"offset":0
}
]
}
Consuming
To consume messages over HTTP with Kafka Bridge:
Creating a Kafka Bridge consumer
To interact with your Kafka cluster, the Kafka Bridge requires a consumer. To create the Kafka Bridge consumer endpoint, create a consumer within a consumer group. For example, the following command creates a Kafka Bridge consumer called my-consumer
in a new consumer group called my-group
:
curl -X POST http://my-bridge-route-es-kafka-bridge.apps.example.com/consumers/my-group \
-H 'content-type: application/vnd.kafka.json.v2+json' \
-d '{
"name": "my-consumer",
"format": "json",
"auto.offset.reset": "earliest",
"enable.auto.commit": false
}'
If the request is successful, the Kafka Bridge returns an HTTP status code 200 OK
and a JSON payload containing the consumer ID (instance_id
) and the base URL (base_uri
). The base URL is used by the HTTP client to interact with the consumer and receive messages from topics. The following is an example response:
{
"instance_id":"my-consumer",
"base_uri":"http://my-bridge-bridge-service:80/consumers/my-group/instances/my-consumer"
}
Subscribing to topics
After creating a Kafka Bridge consumer, you can subscribe the Kafka Bridge consumer to topics by creating a subscription endpoint. For example, the following command subscribes the consumer to the topic called my-topic
:
curl -X POST http://my-bridge-route-es-kafka-bridge.apps.example.com/consumers/my-group/instances/my-consumer/subscription \
-H 'content-type: application/vnd.kafka.json.v2+json' \
-d '{
"topics": [
"my-topic"
]
}'
If the request is successful, the Kafka Bridge returns an HTTP status code 200 OK
with an empty body.
After subscribing, the consumer receives all messages that are produced to the topic.
Retrieving messages from topics
After subscribing a Kafka Bridge consumer to a topic, your client applications can retrieve the messages on the topic from the Kafka Bridge consumer. To retrieve the latest messages, request data from the records endpoint. The following is an example command to retrieve messages from my-topic
:
curl -X GET http://my-bridge-route-es-kafka-bridge.apps.example.com/consumers/my-group/instances/my-consumer/records \
-H 'accept: application/vnd.kafka.json.v2+json'
If the request is successful, the Kafka Bridge returns an HTTP status code 200 OK
and the JSON body containing the messages from the topic. Messages are retrieved from the latest offset by default.
Note: If you receive an empty response, produce more records to the consumer, and then try retrieving messages again.
In production, HTTP clients can call this endpoint repeatedly (in a loop), for example:
while true; do sleep 1; curl -X GET http://my-bridge-route-es-kafka-bridge.apps.example.com/consumers/my-group/instances/my-consumer/records \
-H 'accept: application/vnd.kafka.json.v2+json'; echo ; done
Deleting a Kafka Bridge consumer
If you no longer need a Kafka Bridge consumer, delete it to free up resources on the bridge.
Use the delete consumer endpoint to delete a consumer instance. For example, to delete the consumer created earlier, run the following command:
curl -X DELETE http://my-bridge-bridge-service:80/consumers/my-group/instances/my-consumer/
For more information about using Kafka Bridge to produce to and consume from your Kafka cluster, including committing offsets, see the following blog post.