Auditing Kafka

Event Streams can be configured to generate a sequential record of activities within the Event Streams Kafka cluster. By reviewing this audit trail, administrators can track user activities and investigate security breaches.

Audit trail for Kafka

Event Streams uses log records from every request that is sent to Kafka brokers, and extracts records for events that are useful for tracking activities within the broker. The information that is captured for each event can vary in structure and content, but they all include the following key information:

  • The user (Kafka principal) that initiated the request.
  • The action (Kafka operation) that was requested.
  • The entity (Kafka resource) on which the operation was requested.
  • The date and time when the request was received.
  • The result of the request, including relevant information about reasons for any failures.

As Kafka is a distributed system, the audit events from all brokers must be aggregated in a central location to create a complete audit trail for the Event Streams Kafka cluster. Aggregating audit records in a separate log aggregator enables the retention and visualization of audit trails without impacting the storage requirements of the Event Streams instance.

Before you begin

  • Enabling audit trail for Kafka introduces additional processing that can impact the performance of the Kafka system. Ensure that the impact is assessed in your environment before enabling the audit feature.
  • The storage that is used for the audit trail must be configured with appropriate size and retention policies to handle the volume of audit records.
  • Ensure access to the audit trail is secured by restricting access only to authorized users.

Configuring audit trail for Kafka

To configure Event Streams to provide audit trail for the Kafka cluster, complete the following steps:

  1. Log in to your Kubernetes cluster as a cluster administrator by setting your kubectl context.

  2. Create a file named es-audit-config.yaml and copy the following YAML content into the file to create the ConfigMap that has the log4j configuration properties:

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: <event-streams-audit-configuration>
      namespace: <event-streams-namespace>
    data:
      log4j2.properties: |-
         <log4j-audit-configuration>
    

    Where:

    • <event-streams-namespace> is the namespace where your Event Streams instance is installed.
    • <log4j-audit-configuration> is the log4j configuration to:
      • Configure the Kafka root logger with default settings to have standard logs for your Kafka pods as follows:

         # Kafka root logger
         rootLogger.level = INFO
         rootLogger.appenderRef.stdout.ref = stdout
        
         # Console Appender for standard logs
         appender.stdout.type = Console
         appender.stdout.name = stdout
         appender.stdout.layout.type = PatternLayout
         appender.stdout.layout.pattern = [%d] %p %m (%c)%n
        
      • Configure the Kafka request logger at TRACE level as the source for audit information, and set additivity=false to keep audit information separate from the standard logs for your Kafka pods as follows:

          # Kafka request logger
          logger.kafka_request.name = kafka.request.logger
          logger.kafka_request.level = TRACE
          logger.kafka_request.additivity = false
          logger.kafka_request.appenderRef.audit.ref = audit
        
      • Define an output destination by setting up an appender to direct filtered audit records to a central system for aggregation, retention, and visualization. Ensure access to the audit trail is secured by restricting access only to authorized users.

      • Define a set of filters based on Kafka API Keys (protocols) to include or exclude specific requests in the audit trail.

        For sample log4j configurations, see the example snippets later.

  3. Apply the ConfigMap by running the following command:

    kubectl apply -f es-audit-config.yaml
    
  4. After the ConfigMap is created, Kafka auditing can be enabled by setting spec.strimziOverrides.kafka.logging property in the EventStreams custom resource to point to the <event-streams-audit-configuration> ConfigMap.

     apiVersion: eventstreams.ibm.com/v1beta2
     kind: EventStreams
     # ...
     spec:
     strimziOverrides:
         kafka:
           # ...
           logging:
             type: external
             valueFrom:
                 configMapKeyRef:
                     key: log4j2.properties
                     name: <event-streams-audit-configuration>
    

The Event Streams operator applies the previous changes to the Kafka pods one by one. After all Kafka pods have rolled successfully, the Kafka audit trail will be available in the central system configured for aggregation, retention, and visualization.

Example log4j configurations

See the following log4j configuration examples for auditing purposes. These examples use Syslog for aggregating the audit records from all brokers.

Auditing all Kafka users and all topics

Use the following log4j2 configuration to audit all Kafka users and all topics.

# Kafka root logger
rootLogger.level = INFO
rootLogger.appenderRef.stdout.ref = stdout

# Console Appender for standard logs
appender.stdout.type = Console
appender.stdout.name = stdout
appender.stdout.layout.type = PatternLayout
appender.stdout.layout.pattern = [%d] %p %m (%c)%n

# File Appender for audit logs (using existing Kafka data directory)
appender.audit.type = File
appender.audit.name = audit
appender.audit.fileName = /var/lib/kafka/data/audit.log
appender.audit.layout.type = PatternLayout
appender.audit.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %m%n

# Custom filter for audit events
appender.audit.filter.auditFilter.type = RegexFilter
appender.audit.filter.auditFilter.regex = .*(CREATE_TOPICS|DELETE_TOPICS|CREATE_ACLS|DELETE_ACLS|INCREMENTAL_ALTER_CONFIGS).*
appender.audit.filter.auditFilter.onMatch = ACCEPT
appender.audit.filter.auditFilter.onMismatch = DENY

# Kafka request logger for audit
logger.kafka_request.name = kafka.request.logger
logger.kafka_request.level = TRACE
logger.kafka_request.additivity = false
logger.kafka_request.appenderRef.audit.ref = audit

The audit file is stored at /var/lib/kafka/data/ in the Kafka pod. To aggregate logs, use a tool such as rsyslog instead of a file appender.

Example audit records

See the following examples for audit records of different actions and outcomes from within an audit trail.

Successful user creation

The following audit record indicates an attempt to create a Kafka user that was successful:

{
    "requestHeader": {
        "clientId": "adminclient-1",
        "requestApiKeyName": "CREATE_ACLS"
        ...
    },
    "request": {
        "creations": [
            {
                "resourceType": 2,
                "resourceName": "your.topic.name",
                "resourcePatternType": 3,
                "principal": "User:consumer",
                "host": "*",
                "operation": 3,
                "permissionType": 3
            }
            ...
        ]
    },
    "response": {
        "results": [
            {
                "errorCode": 0,
                "errorMessage": ""
            }
            ...
        ]
        ...
    },
    "securityProtocol": "SSL",
    "principal": "User:CN=dev-scram-entity-user-operator,O=io.strimzi",
    "listener": "REPLICATION-9091",
    ...
}

Failed topic creation

The following audit record indicates an attempt to create a Kafka topic that has failed due to user authorization failure:

{
    "requestHeader": {
        "clientId": "adminclient-2955",
        "requestApiKeyName": "CREATE_TOPICS",
        ...
    },
    "request": {
        "topics": [
            {
                "name": "aaaa",
                "numPartitions": 1,
                "replicationFactor": 1,
                "assignments": [],
                "configs": [
                    {
                        "name": "min.insync.replicas",
                        "value": "1"
                    },
                    ...
                ]
            }
        ]
        ...
    },
    "response": {
        "throttleTimeMs": 0,
        "topics": [
            {
                "name": "aaaa",
                "topicId": "AAAAAAAAAAAAAAAAAAAAAA",
                "errorCode": 29,
                "errorMessage": "Authorization failed.",
                ...
            }
        ]
    },
    "securityProtocol": "SASL_SSL",
    "principal": "User:user2",
    "listener": "EXTERNAL-9094",
    ...
}