Event Streams can be configured to generate a sequential record of activities within the Event Streams Kafka cluster. By reviewing this audit trail, administrators can track user activities and investigate security breaches.
Audit trail for Kafka
Event Streams uses log records from every request that is sent to Kafka brokers, and extracts records for events that are useful for tracking activities within the broker. The information that is captured for each event can vary in structure and content, but they all include the following key information:
- The user (Kafka principal) that initiated the request.
- The action (Kafka operation) that was requested.
- The entity (Kafka resource) on which the operation was requested.
- The date and time when the request was received.
- The result of the request, including relevant information about reasons for any failures.
As Kafka is a distributed system, the audit events from all brokers must be aggregated in a central location to create a complete audit trail for the Event Streams Kafka cluster. Aggregating audit records in a separate log aggregator enables the retention and visualization of audit trails without impacting the storage requirements of the Event Streams instance.
Before you begin
- Enabling audit trail for Kafka introduces additional processing that can impact the performance of the Kafka system. Ensure that the impact is assessed in your environment before enabling the audit feature.
- The storage that is used for the audit trail must be configured with appropriate size and retention policies to handle the volume of audit records.
- Ensure access to the audit trail is secured by restricting access only to authorized users.
Configuring audit trail for Kafka
To configure Event Streams to provide audit trail for the Kafka cluster, complete the following steps:
-
Log in to your Kubernetes cluster as a cluster administrator by setting your
kubectl
context. -
Create a file named
es-audit-config.yaml
and copy the following YAML content into the file to create the ConfigMap that has the log4j configuration properties:apiVersion: v1 kind: ConfigMap metadata: name: <event-streams-audit-configuration> namespace: <event-streams-namespace> data: log4j2.properties: |- <log4j-audit-configuration>
Where:
<event-streams-namespace>
is the namespace where your Event Streams instance is installed.<log4j-audit-configuration>
is the log4j configuration to:-
Configure the Kafka root logger with default settings to have standard logs for your Kafka pods as follows:
# Kafka root logger rootLogger.level = INFO rootLogger.appenderRef.stdout.ref = stdout # Console Appender for standard logs appender.stdout.type = Console appender.stdout.name = stdout appender.stdout.layout.type = PatternLayout appender.stdout.layout.pattern = [%d] %p %m (%c)%n
-
Configure the Kafka request logger at
TRACE
level as the source for audit information, and setadditivity=false
to keep audit information separate from the standard logs for your Kafka pods as follows:# Kafka request logger logger.kafka_request.name = kafka.request.logger logger.kafka_request.level = TRACE logger.kafka_request.additivity = false logger.kafka_request.appenderRef.audit.ref = audit
-
Define an output destination by setting up an appender to direct filtered audit records to a central system for aggregation, retention, and visualization. Ensure access to the audit trail is secured by restricting access only to authorized users.
-
Define a set of filters based on Kafka API Keys (protocols) to include or exclude specific requests in the audit trail.
For sample log4j configurations, see the example snippets later.
-
-
Apply the ConfigMap by running the following command:
kubectl apply -f es-audit-config.yaml
-
After the ConfigMap is created, Kafka auditing can be enabled by setting
spec.strimziOverrides.kafka.logging
property in theEventStreams
custom resource to point to the<event-streams-audit-configuration>
ConfigMap.apiVersion: eventstreams.ibm.com/v1beta2 kind: EventStreams # ... spec: strimziOverrides: kafka: # ... logging: type: external valueFrom: configMapKeyRef: key: log4j2.properties name: <event-streams-audit-configuration>
The Event Streams operator applies the previous changes to the Kafka pods one by one. After all Kafka pods have rolled successfully, the Kafka audit trail will be available in the central system configured for aggregation, retention, and visualization.
Example log4j configurations
See the following log4j configuration examples for auditing purposes. These examples use Syslog for aggregating the audit records from all brokers.
Auditing all Kafka users and all topics
Use the following log4j2 configuration to audit all Kafka users and all topics.
# Kafka root logger
rootLogger.level = INFO
rootLogger.appenderRef.stdout.ref = stdout
# Console Appender for standard logs
appender.stdout.type = Console
appender.stdout.name = stdout
appender.stdout.layout.type = PatternLayout
appender.stdout.layout.pattern = [%d] %p %m (%c)%n
# File Appender for audit logs (using existing Kafka data directory)
appender.audit.type = File
appender.audit.name = audit
appender.audit.fileName = /var/lib/kafka/data/audit.log
appender.audit.layout.type = PatternLayout
appender.audit.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %m%n
# Custom filter for audit events
appender.audit.filter.auditFilter.type = RegexFilter
appender.audit.filter.auditFilter.regex = .*(CREATE_TOPICS|DELETE_TOPICS|CREATE_ACLS|DELETE_ACLS|INCREMENTAL_ALTER_CONFIGS).*
appender.audit.filter.auditFilter.onMatch = ACCEPT
appender.audit.filter.auditFilter.onMismatch = DENY
# Kafka request logger for audit
logger.kafka_request.name = kafka.request.logger
logger.kafka_request.level = TRACE
logger.kafka_request.additivity = false
logger.kafka_request.appenderRef.audit.ref = audit
The audit file is stored at /var/lib/kafka/data/
in the Kafka pod. To aggregate logs, use a tool such as rsyslog instead of a file appender.
Example audit records
See the following examples for audit records of different actions and outcomes from within an audit trail.
Successful user creation
The following audit record indicates an attempt to create a Kafka user that was successful:
{
"requestHeader": {
"clientId": "adminclient-1",
"requestApiKeyName": "CREATE_ACLS"
...
},
"request": {
"creations": [
{
"resourceType": 2,
"resourceName": "your.topic.name",
"resourcePatternType": 3,
"principal": "User:consumer",
"host": "*",
"operation": 3,
"permissionType": 3
}
...
]
},
"response": {
"results": [
{
"errorCode": 0,
"errorMessage": ""
}
...
]
...
},
"securityProtocol": "SSL",
"principal": "User:CN=dev-scram-entity-user-operator,O=io.strimzi",
"listener": "REPLICATION-9091",
...
}
Failed topic creation
The following audit record indicates an attempt to create a Kafka topic that has failed due to user authorization failure:
{
"requestHeader": {
"clientId": "adminclient-2955",
"requestApiKeyName": "CREATE_TOPICS",
...
},
"request": {
"topics": [
{
"name": "aaaa",
"numPartitions": 1,
"replicationFactor": 1,
"assignments": [],
"configs": [
{
"name": "min.insync.replicas",
"value": "1"
},
...
]
}
]
...
},
"response": {
"throttleTimeMs": 0,
"topics": [
{
"name": "aaaa",
"topicId": "AAAAAAAAAAAAAAAAAAAAAA",
"errorCode": 29,
"errorMessage": "Authorization failed.",
...
}
]
},
"securityProtocol": "SASL_SSL",
"principal": "User:user2",
"listener": "EXTERNAL-9094",
...
}