34/security.html [683:2084]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  • Authentication using SASL/Kerberos

    1. Prerequisites
      1. Kerberos
        If your organization is already using a Kerberos server (for example, by using Active Directory), there is no need to install a new server just for Kafka. Otherwise you will need to install one, your Linux vendor likely has packages for Kerberos and a short guide on how to install and configure it (Ubuntu, Redhat). Note that if you are using Oracle Java, you will need to download JCE policy files for your Java version and copy them to $JAVA_HOME/jre/lib/security.
      2. Create Kerberos Principals
        If you are using the organization's Kerberos or Active Directory server, ask your Kerberos administrator for a principal for each Kafka broker in your cluster and for every operating system user that will access Kafka with Kerberos authentication (via clients and tools).
        If you have installed your own Kerberos, you will need to create these principals yourself using the following commands:
        > sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
        > sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"
      3. Make sure all hosts can be reachable using hostnames - it is a Kerberos requirement that all your hosts can be resolved with their FQDNs.
    2. Configuring Kafka Brokers
      1. Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example (note that each broker should have its own keytab):
        KafkaServer {
            com.sun.security.auth.module.Krb5LoginModule required
            useKeyTab=true
            storeKey=true
            keyTab="/etc/security/keytabs/kafka_server.keytab"
            principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
        };
        
        // Zookeeper client authentication
        Client {
            com.sun.security.auth.module.Krb5LoginModule required
            useKeyTab=true
            storeKey=true
            keyTab="/etc/security/keytabs/kafka_server.keytab"
            principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
        };
        KafkaServer section in the JAAS file tells the broker which principal to use and the location of the keytab where this principal is stored. It allows the broker to login using the keytab specified in this section. See notes for more details on Zookeeper SASL configuration.
      2. Pass the JAAS and optionally the krb5 file locations as JVM parameters to each Kafka broker (see here for more details):
        -Djava.security.krb5.conf=/etc/kafka/krb5.conf
        -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
      3. Make sure the keytabs configured in the JAAS file are readable by the operating system user who is starting kafka broker.
      4. Configure SASL port and SASL mechanisms in server.properties as described here. For example:
        listeners=SASL_PLAINTEXT://host.name:port
        security.inter.broker.protocol=SASL_PLAINTEXT
        sasl.mechanism.inter.broker.protocol=GSSAPI
        sasl.enabled.mechanisms=GSSAPI
        We must also configure the service name in server.properties, which should match the principal name of the kafka brokers. In the above example, principal is "kafka/kafka1.hostname.com@EXAMPLE.com", so:
        sasl.kerberos.service.name=kafka
    3. Configuring Kafka Clients
      To configure SASL authentication on the clients:
      1. Clients (producers, consumers, connect workers, etc) will authenticate to the cluster with their own principal (usually with the same name as the user running the client), so obtain or create these principals as needed. Then configure the JAAS configuration property for each client. Different clients within a JVM may run as different users by specifying different principals. The property sasl.jaas.config in producer.properties or consumer.properties describes how clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client using a keytab (recommended for long-running processes):
        sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
            useKeyTab=true \
            storeKey=true  \
            keyTab="/etc/security/keytabs/kafka_client.keytab" \
            principal="kafka-client-1@EXAMPLE.COM";
        For command-line utilities like kafka-console-consumer or kafka-console-producer, kinit can be used along with "useTicketCache=true" as in:
        sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
            useTicketCache=true;
        JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.
      2. Make sure the keytabs configured in the JAAS configuration are readable by the operating system user who is starting kafka client.
      3. Optionally pass the krb5 file locations as JVM parameters to each client JVM (see here for more details):
        -Djava.security.krb5.conf=/etc/kafka/krb5.conf
      4. Configure the following properties in producer.properties or consumer.properties:
        security.protocol=SASL_PLAINTEXT (or SASL_SSL)
        sasl.mechanism=GSSAPI
        sasl.kerberos.service.name=kafka
  • Authentication using SASL/PLAIN

    SASL/PLAIN is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication. Kafka supports a default implementation for SASL/PLAIN which can be extended for production use as described here.

    Under the default implementation of principal.builder.class, the username is used as the authenticated Principal for configuration of ACLs etc.
    1. Configuring Kafka Brokers
      1. Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
        KafkaServer {
            org.apache.kafka.common.security.plain.PlainLoginModule required
            username="admin"
            password="admin-secret"
            user_admin="admin-secret"
            user_alice="alice-secret";
        };
        This configuration defines two users (admin and alice). The properties username and password in the KafkaServer section are used by the broker to initiate connections to other brokers. In this example, admin is the user for inter-broker communication. The set of properties user_userName defines the passwords for all users that connect to the broker and the broker validates all client connections including those from other brokers using these properties.
      2. Pass the JAAS config file location as JVM parameter to each Kafka broker:
        -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
      3. Configure SASL port and SASL mechanisms in server.properties as described here. For example:
        listeners=SASL_SSL://host.name:port
        security.inter.broker.protocol=SASL_SSL
        sasl.mechanism.inter.broker.protocol=PLAIN
        sasl.enabled.mechanisms=PLAIN
    2. Configuring Kafka Clients
      To configure SASL authentication on the clients:
      1. Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the PLAIN mechanism:
        sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
            username="alice" \
            password="alice-secret";

        The options username and password are used by clients to configure the user for client connections. In this example, clients connect to the broker as user alice. Different clients within a JVM may connect as different users by specifying different user names and passwords in sasl.jaas.config.

        JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.

      2. Configure the following properties in producer.properties or consumer.properties:
        security.protocol=SASL_SSL
        sasl.mechanism=PLAIN
    3. Use of SASL/PLAIN in production
      • SASL/PLAIN should be used only with SSL as transport layer to ensure that clear passwords are not transmitted on the wire without encryption.
      • The default implementation of SASL/PLAIN in Kafka specifies usernames and passwords in the JAAS configuration file as shown here. From Kafka version 2.0 onwards, you can avoid storing clear passwords on disk by configuring your own callback handlers that obtain username and password from an external source using the configuration options sasl.server.callback.handler.class and sasl.client.callback.handler.class.
      • In production systems, external authentication servers may implement password authentication. From Kafka version 2.0 onwards, you can plug in your own callback handlers that use external authentication servers for password verification by configuring sasl.server.callback.handler.class.
  • Authentication using SASL/SCRAM

    Salted Challenge Response Authentication Mechanism (SCRAM) is a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username/password authentication like PLAIN and DIGEST-MD5. The mechanism is defined in RFC 5802. Kafka supports SCRAM-SHA-256 and SCRAM-SHA-512 which can be used with TLS to perform secure authentication. Under the default implementation of principal.builder.class, the username is used as the authenticated Principal for configuration of ACLs etc. The default SCRAM implementation in Kafka stores SCRAM credentials in Zookeeper and is suitable for use in Kafka installations where Zookeeper is on a private network. Refer to Security Considerations for more details.

    1. Creating SCRAM Credentials

      The SCRAM implementation in Kafka uses Zookeeper as credential store. Credentials can be created in Zookeeper using kafka-configs.sh. For each SCRAM mechanism enabled, credentials must be created by adding a config with the mechanism name. Credentials for inter-broker communication must be created before Kafka brokers are started. Client credentials may be created and updated dynamically and updated credentials will be used to authenticate new connections.

      Create SCRAM credentials for user alice with password alice-secret:

      > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice

      The default iteration count of 4096 is used if iterations are not specified. A random salt is created and the SCRAM identity consisting of salt, iterations, StoredKey and ServerKey are stored in Zookeeper. See RFC 5802 for details on SCRAM identity and the individual fields.

      The following examples also require a user admin for inter-broker communication which can be created using:

      > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin

      Existing credentials may be listed using the --describe option:

      > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --describe --entity-type users --entity-name alice

      Credentials may be deleted for one or more SCRAM mechanisms using the --alter --delete-config option:

      > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice
    2. Configuring Kafka Brokers
      1. Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
        KafkaServer {
            org.apache.kafka.common.security.scram.ScramLoginModule required
            username="admin"
            password="admin-secret";
        };
        The properties username and password in the KafkaServer section are used by the broker to initiate connections to other brokers. In this example, admin is the user for inter-broker communication.
      2. Pass the JAAS config file location as JVM parameter to each Kafka broker:
        -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
      3. Configure SASL port and SASL mechanisms in server.properties as described here. For example:
        listeners=SASL_SSL://host.name:port
        security.inter.broker.protocol=SASL_SSL
        sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 (or SCRAM-SHA-512)
        sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)
    3. Configuring Kafka Clients
      To configure SASL authentication on the clients:
      1. Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the SCRAM mechanisms:
        sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
            username="alice" \
            password="alice-secret";

        The options username and password are used by clients to configure the user for client connections. In this example, clients connect to the broker as user alice. Different clients within a JVM may connect as different users by specifying different user names and passwords in sasl.jaas.config.

        JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.

      2. Configure the following properties in producer.properties or consumer.properties:
        security.protocol=SASL_SSL
        sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)
    4. Security Considerations for SASL/SCRAM
      • The default implementation of SASL/SCRAM in Kafka stores SCRAM credentials in Zookeeper. This is suitable for production use in installations where Zookeeper is secure and on a private network.
      • Kafka supports only the strong hash functions SHA-256 and SHA-512 with a minimum iteration count of 4096. Strong hash functions combined with strong passwords and high iteration counts protect against brute force attacks if Zookeeper security is compromised.
      • SCRAM should be used only with TLS-encryption to prevent interception of SCRAM exchanges. This protects against dictionary or brute force attacks and against impersonation if Zookeeper is compromised.
      • From Kafka version 2.0 onwards, the default SASL/SCRAM credential store may be overridden using custom callback handlers by configuring sasl.server.callback.handler.class in installations where Zookeeper is not secure.
      • For more details on security considerations, refer to RFC 5802.
  • Authentication using SASL/OAUTHBEARER

    The OAuth 2 Authorization Framework "enables a third-party application to obtain limited access to an HTTP service, either on behalf of a resource owner by orchestrating an approval interaction between the resource owner and the HTTP service, or by allowing the third-party application to obtain access on its own behalf." The SASL OAUTHBEARER mechanism enables the use of the framework in a SASL (i.e. a non-HTTP) context; it is defined in RFC 7628. The default OAUTHBEARER implementation in Kafka creates and validates Unsecured JSON Web Tokens and is only suitable for use in non-production Kafka installations. Refer to Security Considerations for more details.

    Under the default implementation of principal.builder.class, the principalName of OAuthBearerToken is used as the authenticated Principal for configuration of ACLs etc.
    1. Configuring Kafka Brokers
      1. Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
        KafkaServer {
            org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
            unsecuredLoginStringClaim_sub="admin";
        };
        The property unsecuredLoginStringClaim_sub in the KafkaServer section is used by the broker when it initiates connections to other brokers. In this example, admin will appear in the subject (sub) claim and will be the user for inter-broker communication.
      2. Pass the JAAS config file location as JVM parameter to each Kafka broker:
        -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
      3. Configure SASL port and SASL mechanisms in server.properties as described here. For example:
        listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
        security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
        sasl.mechanism.inter.broker.protocol=OAUTHBEARER
        sasl.enabled.mechanisms=OAUTHBEARER
    2. Configuring Kafka Clients
      To configure SASL authentication on the clients:
      1. Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the OAUTHBEARER mechanisms:
        sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
            unsecuredLoginStringClaim_sub="alice";

        The option unsecuredLoginStringClaim_sub is used by clients to configure the subject (sub) claim, which determines the user for client connections. In this example, clients connect to the broker as user alice. Different clients within a JVM may connect as different users by specifying different subject (sub) claims in sasl.jaas.config.

        JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.

      2. Configure the following properties in producer.properties or consumer.properties:
        security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
        sasl.mechanism=OAUTHBEARER
      3. The default implementation of SASL/OAUTHBEARER depends on the jackson-databind library. Since it's an optional dependency, users have to configure it as a dependency via their build tool.
    3. Unsecured Token Creation Options for SASL/OAUTHBEARER
      • The default implementation of SASL/OAUTHBEARER in Kafka creates and validates Unsecured JSON Web Tokens. While suitable only for non-production use, it does provide the flexibility to create arbitrary tokens in a DEV or TEST environment.
      • Here are the various supported JAAS module options on the client side (and on the broker side if OAUTHBEARER is the inter-broker protocol):
        JAAS Module Option for Unsecured Token Creation Documentation
        unsecuredLoginStringClaim_<claimname>="value" Creates a String claim with the given name and value. Any valid claim name can be specified except 'iat' and 'exp' (these are automatically generated).
        unsecuredLoginNumberClaim_<claimname>="value" Creates a Number claim with the given name and value. Any valid claim name can be specified except 'iat' and 'exp' (these are automatically generated).
        unsecuredLoginListClaim_<claimname>="value" Creates a String List claim with the given name and values parsed from the given value where the first character is taken as the delimiter. For example: unsecuredLoginListClaim_fubar="|value1|value2". Any valid claim name can be specified except 'iat' and 'exp' (these are automatically generated).
        unsecuredLoginExtension_<extensionname>="value" Creates a String extension with the given name and value. For example: unsecuredLoginExtension_traceId="123". A valid extension name is any sequence of lowercase or uppercase alphabet characters. In addition, the "auth" extension name is reserved. A valid extension value is any combination of characters with ASCII codes 1-127.
        unsecuredLoginPrincipalClaimName Set to a custom claim name if you wish the name of the String claim holding the principal name to be something other than 'sub'.
        unsecuredLoginLifetimeSeconds Set to an integer value if the token expiration is to be set to something other than the default value of 3600 seconds (which is 1 hour). The 'exp' claim will be set to reflect the expiration time.
        unsecuredLoginScopeClaimName Set to a custom claim name if you wish the name of the String or String List claim holding any token scope to be something other than 'scope'.
    4. Unsecured Token Validation Options for SASL/OAUTHBEARER
      • Here are the various supported JAAS module options on the broker side for Unsecured JSON Web Token validation:
        JAAS Module Option for Unsecured Token Validation Documentation
        unsecuredValidatorPrincipalClaimName="value" Set to a non-empty value if you wish a particular String claim holding a principal name to be checked for existence; the default is to check for the existence of the 'sub' claim.
        unsecuredValidatorScopeClaimName="value" Set to a custom claim name if you wish the name of the String or String List claim holding any token scope to be something other than 'scope'.
        unsecuredValidatorRequiredScope="value" Set to a space-delimited list of scope values if you wish the String/String List claim holding the token scope to be checked to make sure it contains certain values.
        unsecuredValidatorAllowableClockSkewMs="value" Set to a positive integer value if you wish to allow up to some number of positive milliseconds of clock skew (the default is 0).
      • The default unsecured SASL/OAUTHBEARER implementation may be overridden (and must be overridden in production environments) using custom login and SASL Server callback handlers.
      • For more details on security considerations, refer to RFC 6749, Section 10.
    5. Token Refresh for SASL/OAUTHBEARER
      Kafka periodically refreshes any token before it expires so that the client can continue to make connections to brokers. The parameters that impact how the refresh algorithm operates are specified as part of the producer/consumer/broker configuration and are as follows. See the documentation for these properties elsewhere for details. The default values are usually reasonable, in which case these configuration parameters would not need to be explicitly set.
      Producer/Consumer/Broker Configuration Property
      sasl.login.refresh.window.factor
      sasl.login.refresh.window.jitter
      sasl.login.refresh.min.period.seconds
      sasl.login.refresh.min.buffer.seconds
    6. Secure/Production Use of SASL/OAUTHBEARER
      Production use cases will require writing an implementation of org.apache.kafka.common.security.auth.AuthenticateCallbackHandler that can handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback and declaring it via either the sasl.login.callback.handler.class configuration option for a non-broker client or via the listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class configuration option for brokers (when SASL/OAUTHBEARER is the inter-broker protocol).

      Production use cases will also require writing an implementation of org.apache.kafka.common.security.auth.AuthenticateCallbackHandler that can handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback and declaring it via the listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class broker configuration option.

    7. Security Considerations for SASL/OAUTHBEARER
      • The default implementation of SASL/OAUTHBEARER in Kafka creates and validates Unsecured JSON Web Tokens. This is suitable only for non-production use.
      • OAUTHBEARER should be used in production enviromnments only with TLS-encryption to prevent interception of tokens.
      • The default unsecured SASL/OAUTHBEARER implementation may be overridden (and must be overridden in production environments) using custom login and SASL Server callback handlers as described above.
      • For more details on OAuth 2 security considerations in general, refer to RFC 6749, Section 10.
  • Enabling multiple SASL mechanisms in a broker

    1. Specify configuration for the login modules of all enabled mechanisms in the KafkaServer section of the JAAS config file. For example:
      KafkaServer {
          com.sun.security.auth.module.Krb5LoginModule required
          useKeyTab=true
          storeKey=true
          keyTab="/etc/security/keytabs/kafka_server.keytab"
          principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
      
          org.apache.kafka.common.security.plain.PlainLoginModule required
          username="admin"
          password="admin-secret"
          user_admin="admin-secret"
          user_alice="alice-secret";
      };
    2. Enable the SASL mechanisms in server.properties:
      sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER
    3. Specify the SASL security protocol and mechanism for inter-broker communication in server.properties if required:
      security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
      sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechanisms)
    4. Follow the mechanism-specific steps in GSSAPI (Kerberos), PLAIN, SCRAM and OAUTHBEARER to configure SASL for the enabled mechanisms.
  • Modifying SASL mechanism in a Running Cluster

    SASL mechanism can be modified in a running cluster using the following sequence:

    1. Enable new SASL mechanism by adding the mechanism to sasl.enabled.mechanisms in server.properties for each broker. Update JAAS config file to include both mechanisms as described here. Incrementally bounce the cluster nodes.
    2. Restart clients using the new mechanism.
    3. To change the mechanism of inter-broker communication (if this is required), set sasl.mechanism.inter.broker.protocol in server.properties to the new mechanism and incrementally bounce the cluster again.
    4. To remove old mechanism (if this is required), remove the old mechanism from sasl.enabled.mechanisms in server.properties and remove the entries for the old mechanism from JAAS config file. Incrementally bounce the cluster again.
  • Authentication using Delegation Tokens

    Delegation token based authentication is a lightweight authentication mechanism to complement existing SASL/SSL methods. Delegation tokens are shared secrets between kafka brokers and clients. Delegation tokens will help processing frameworks to distribute the workload to available workers in a secure environment without the added cost of distributing Kerberos TGT/keytabs or keystores when 2-way SSL is used. See KIP-48 for more details.

    Under the default implementation of principal.builder.class, the owner of delegation token is used as the authenticated Principal for configuration of ACLs etc.

    Typical steps for delegation token usage are:

    1. User authenticates with the Kafka cluster via SASL or SSL, and obtains a delegation token. This can be done using Admin APIs or using kafka-delegation-tokens.sh script.
    2. User securely passes the delegation token to Kafka clients for authenticating with the Kafka cluster.
    3. Token owner/renewer can renew/expire the delegation tokens.
    1. Token Management

      A secret is used to generate and verify delegation tokens. This is supplied using config option delegation.token.secret.key. The same secret key must be configured across all the brokers. If the secret is not set or set to empty string, brokers will disable the delegation token authentication.

      In the current implementation, token details are stored in Zookeeper and is suitable for use in Kafka installations where Zookeeper is on a private network. Also currently, this secret is stored as plain text in the server.properties config file. We intend to make these configurable in a future Kafka release.

      A token has a current life, and a maximum renewable life. By default, tokens must be renewed once every 24 hours for up to 7 days. These can be configured using delegation.token.expiry.time.ms and delegation.token.max.lifetime.ms config options.

      Tokens can also be cancelled explicitly. If a token is not renewed by the token’s expiration time or if token is beyond the max life time, it will be deleted from all broker caches as well as from zookeeper.

    2. Creating Delegation Tokens

      Tokens can be created by using Admin APIs or using kafka-delegation-tokens.sh script. Delegation token requests (create/renew/expire/describe) should be issued only on SASL or SSL authenticated channels. Tokens can not be requests if the initial authentication is done through delegation token. A token can be created by the user for that user or others as well by specifying the --owner-principal parameter. Owner/Renewers can renew or expire tokens. Owner/renewers can always describe their own tokens. To describe other tokens, a DESCRIBE_TOKEN permission needs to be added on the User resource representing the owner of the token. kafka-delegation-tokens.sh script examples are given below.

      Create a delegation token:

      > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create   --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1

      Create a delegation token for a different owner:

      > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create   --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1 --owner-principal User:owner1

      Renew a delegation token:

      > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew    --renew-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK

      Expire a delegation token:

      > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire   --expiry-time-period -1   --command-config client.properties  --hmac ABCDEFGHIJK

      Existing tokens can be described using the --describe option:

      > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe --command-config client.properties  --owner-principal User:user1
    3. Token Authentication

      Delegation token authentication piggybacks on the current SASL/SCRAM authentication mechanism. We must enable SASL/SCRAM mechanism on Kafka cluster as described in here.

      Configuring Kafka Clients:

      1. Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the token authentication:
        sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
            username="tokenID123" \
            password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
            tokenauth="true";

        The options username and password are used by clients to configure the token id and token HMAC. And the option tokenauth is used to indicate the server about token authentication. In this example, clients connect to the broker using token id: tokenID123. Different clients within a JVM may connect using different tokens by specifying different token details in sasl.jaas.config.

        JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.

    4. Procedure to manually rotate the secret:

      We require a re-deployment when the secret needs to be rotated. During this process, already connected clients will continue to work. But any new connection requests and renew/expire requests with old tokens can fail. Steps are given below.

      1. Expire all existing tokens.
      2. Rotate the secret by rolling upgrade, and
      3. Generate new tokens

      We intend to automate this in a future Kafka release.

  • 7.5 Authorization and ACLs

    Kafka ships with a pluggable authorization framework, which is configured with the authorizer.class.name property in the server confgiuration. Configured implementations must extend org.apache.kafka.server.authorizer.Authorizer. Kafka provides default implementations which store ACLs in the cluster metadata (either Zookeeper or the KRaft metadata log). For Zookeeper-based clusters, the provided implementation is configured as follows:
    authorizer.class.name=kafka.security.authorizer.AclAuthorizer
    For KRaft clusters, use the following configuration on all nodes (brokers, controllers, or combined broker/controller nodes):
    authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
    Kafka ACLs are defined in the general format of "Principal {P} is [Allowed|Denied] Operation {O} From Host {H} on any Resource {R} matching ResourcePattern {RP}". You can read more about the ACL structure in KIP-11 and resource patterns in KIP-290. In order to add, remove, or list ACLs, you can use the Kafka ACL CLI kafka-acls.sh. By default, if no ResourcePatterns match a specific Resource R, then R has no associated ACLs, and therefore no one other than super users is allowed to access R. If you want to change that behavior, you can include the following in server.properties.
    allow.everyone.if.no.acl.found=true
    One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string "User" is case sensitive.
    super.users=User:Bob;User:Alice
    KRaft Principal Forwarding
    In KRaft clusters, admin requests such as CreateTopics and DeleteTopics are sent to the broker listeners by the client. The broker then forwards the request to the active controller through the first listener configured in controller.listener.names. Authorization of these requests is done on the controller node. This is achieved by way of an Envelope request which packages both the underlying request from the client as well as the client principal. When the controller receives the forwarded Envelope request from the broker, it first authorizes the Envelope request using the authenticated broker principal. Then it authorizes the underlying request using the forwarded principal.
    All of this implies that Kafka must understand how to serialize and deserialize the client principal. The authentication framework allows for customized principals by overriding the principal.builder.class configuration. In order for customized principals to work with KRaft, the configured class must implement org.apache.kafka.common.security.auth.KafkaPrincipalSerde so that Kafka knows how to serialize and deserialize the principals. The default implementation org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder uses the Kafka RPC format defined in the source code: clients/src/main/resources/common/message/DefaultPrincipalData.json. For more detail about request forwarding in KRaft, see KIP-590
    Customizing SSL User Name
    By default, the SSL user name will be of the form "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can change that by setting ssl.principal.mapping.rules to a customized rule in server.properties. This config allows a list of rules for mapping X.500 distinguished name to short name. The rules are evaluated in order and the first rule that matches a distinguished name is used to map it to a short name. Any later rules in the list are ignored.
    The format of ssl.principal.mapping.rules is a list where each rule starts with "RULE:" and contains an expression as the following formats. Default rule will return string representation of the X.500 certificate distinguished name. If the distinguished name matches the pattern, then the replacement command will be run over the name. This also supports lowercase/uppercase options, to force the translated result to be all lower/uppercase case. This is done by adding a "/L" or "/U' to the end of the rule.
    RULE:pattern/replacement/
    RULE:pattern/replacement/[LU]
    Example ssl.principal.mapping.rules values are:
    RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
    RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
    RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
    DEFAULT
    Above rules translate distinguished name "CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "serviceuser" and "CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "adminuser@admin".
    For advanced use cases, one can customize the name by setting a customized PrincipalBuilder in server.properties like the following.
    principal.builder.class=CustomizedPrincipalBuilderClass
    Customizing SASL User Name
    By default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting sasl.kerberos.principal.to.local.rules to a customized rule in server.properties. The format of sasl.kerberos.principal.to.local.rules is a list where each rule works in the same way as the auth_to_local in Kerberos configuration file (krb5.conf). This also support additional lowercase/uppercase rule, to force the translated result to be all lowercase/uppercase. This is done by adding a "/L" or "/U" to the end of the rule. check below formats for syntax. Each rules starts with RULE: and contains an expression as the following formats. See the kerberos documentation for more details.
    RULE:[n:string](regexp)s/pattern/replacement/
    RULE:[n:string](regexp)s/pattern/replacement/g
    RULE:[n:string](regexp)s/pattern/replacement//L
    RULE:[n:string](regexp)s/pattern/replacement/g/L
    RULE:[n:string](regexp)s/pattern/replacement//U
    RULE:[n:string](regexp)s/pattern/replacement/g/U
    An example of adding a rule to properly translate user@MYDOMAIN.COM to user while also keeping the default rule in place is:
    sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT

    Command Line Interface

    Kafka Authorization management CLI can be found under bin directory with all the other CLIs. The CLI script is called kafka-acls.sh. Following lists all the options that the script supports:

    Option Description Default Option type
    --add Indicates to the script that user is trying to add an acl. Action
    --remove Indicates to the script that user is trying to remove an acl. Action
    --list Indicates to the script that user is trying to list acls. Action
    --bootstrap-server A list of host/port pairs to use for establishing the connection to the Kafka cluster. Only one of --bootstrap-server or --authorizer option must be specified. Configuration
    --command-config A property file containing configs to be passed to Admin Client. This option can only be used with --bootstrap-server option. Configuration
    --cluster Indicates to the script that the user is trying to interact with acls on the singular cluster resource. ResourcePattern
    --topic [topic-name] Indicates to the script that the user is trying to interact with acls on topic resource pattern(s). ResourcePattern
    --group [group-name] Indicates to the script that the user is trying to interact with acls on consumer-group resource pattern(s) ResourcePattern
    --transactional-id [transactional-id] The transactionalId to which ACLs should be added or removed. A value of * indicates the ACLs should apply to all transactionalIds. ResourcePattern
    --delegation-token [delegation-token] Delegation token to which ACLs should be added or removed. A value of * indicates ACL should apply to all tokens. ResourcePattern
    --user-principal [user-principal] A user resource to which ACLs should be added or removed. This is currently supported in relation with delegation tokens. A value of * indicates ACL should apply to all users. ResourcePattern
    --resource-pattern-type [pattern-type] Indicates to the script the type of resource pattern, (for --add), or resource pattern filter, (for --list and --remove), the user wishes to use.
    When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'.
    When listing or removing acls, a specific pattern type filter can be used to list or remove acls from a specific type of resource pattern, or the filter values of 'any' or 'match' can be used, where 'any' will match any pattern type, but will match the resource name exactly, and 'match' will perform pattern matching to list or remove all acls that affect the supplied resource(s).
    WARNING: 'match', when used in combination with the '--remove' switch, should be used with care.
    literal Configuration
    --allow-principal Principal is in PrincipalType:name format that will be added to ACL with Allow permission. Default PrincipalType string "User" is case sensitive.
    You can specify multiple --allow-principal in a single command.
    Principal
    --deny-principal Principal is in PrincipalType:name format that will be added to ACL with Deny permission. Default PrincipalType string "User" is case sensitive.
    You can specify multiple --deny-principal in a single command.
    Principal
    --principal Principal is in PrincipalType:name format that will be used along with --list option. Default PrincipalType string "User" is case sensitive. This will list the ACLs for the specified principal.
    You can specify multiple --principal in a single command.
    Principal
    --allow-host IP address from which principals listed in --allow-principal will have access. if --allow-principal is specified defaults to * which translates to "all hosts" Host
    --deny-host IP address from which principals listed in --deny-principal will be denied access. if --deny-principal is specified defaults to * which translates to "all hosts" Host
    --operation Operation that will be allowed or denied.
    Valid values are:
    • Read
    • Write
    • Create
    • Delete
    • Alter
    • Describe
    • ClusterAction
    • DescribeConfigs
    • AlterConfigs
    • IdempotentWrite
    • CreateTokens
    • DescribeTokens
    • All
    All Operation
    --producer Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE and CREATE on topic. Convenience
    --consumer Convenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group. Convenience
    --idempotent Enable idempotence for the producer. This should be used in combination with the --producer option.
    Note that idempotence is enabled automatically if the producer is authorized to a particular transactional-id.
    Convenience
    --force Convenience option to assume yes to all queries and do not prompt. Convenience
    --authorizer (DEPRECATED: not supported in KRaft) Fully qualified class name of the authorizer. kafka.security.authorizer.AclAuthorizer Configuration
    --authorizer-properties (DEPRECATED: not supported in KRaft) key=val pairs that will be passed to authorizer for initialization. For the default authorizer in ZK clsuters, the example values are: zookeeper.connect=localhost:2181 Configuration
    --zk-tls-config-file (DEPRECATED: not supported in KRaft) Identifies the file where ZooKeeper client TLS connectivity properties for the authorizer are defined. Any properties other than the following (with or without an "authorizer." prefix) are ignored: zookeeper.clientCnxnSocket, zookeeper.ssl.cipher.suites, zookeeper.ssl.client.enable, zookeeper.ssl.crl.enable, zookeeper.ssl.enabled.protocols, zookeeper.ssl.endpoint.identification.algorithm, zookeeper.ssl.keystore.location, zookeeper.ssl.keystore.password, zookeeper.ssl.keystore.type, zookeeper.ssl.ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location, zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type Configuration

    Examples

    Authorization Primitives

    Protocol calls are usually performing some operations on certain resources in Kafka. It is required to know the operations and resources to set up effective protection. In this section we'll list these operations and resources, then list the combination of these with the protocols to see the valid scenarios.

    Operations in Kafka

    There are a few operation primitives that can be used to build up privileges. These can be matched up with certain resources to allow specific protocol calls for a given user. These are:

    Resources in Kafka

    The operations above can be applied on certain resources which are described below.

    Operations and Resources on Protocols

    In the below table we'll list the valid operations on resources that are executed by the Kafka API protocols.

    - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 35/security.html [689:2090]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  • Authentication using SASL/Kerberos

    1. Prerequisites
      1. Kerberos
        If your organization is already using a Kerberos server (for example, by using Active Directory), there is no need to install a new server just for Kafka. Otherwise you will need to install one, your Linux vendor likely has packages for Kerberos and a short guide on how to install and configure it (Ubuntu, Redhat). Note that if you are using Oracle Java, you will need to download JCE policy files for your Java version and copy them to $JAVA_HOME/jre/lib/security.
      2. Create Kerberos Principals
        If you are using the organization's Kerberos or Active Directory server, ask your Kerberos administrator for a principal for each Kafka broker in your cluster and for every operating system user that will access Kafka with Kerberos authentication (via clients and tools).
        If you have installed your own Kerberos, you will need to create these principals yourself using the following commands:
        > sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
        > sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"
      3. Make sure all hosts can be reachable using hostnames - it is a Kerberos requirement that all your hosts can be resolved with their FQDNs.
    2. Configuring Kafka Brokers
      1. Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example (note that each broker should have its own keytab):
        KafkaServer {
            com.sun.security.auth.module.Krb5LoginModule required
            useKeyTab=true
            storeKey=true
            keyTab="/etc/security/keytabs/kafka_server.keytab"
            principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
        };
        
        // Zookeeper client authentication
        Client {
            com.sun.security.auth.module.Krb5LoginModule required
            useKeyTab=true
            storeKey=true
            keyTab="/etc/security/keytabs/kafka_server.keytab"
            principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
        };
        KafkaServer section in the JAAS file tells the broker which principal to use and the location of the keytab where this principal is stored. It allows the broker to login using the keytab specified in this section. See notes for more details on Zookeeper SASL configuration.
      2. Pass the JAAS and optionally the krb5 file locations as JVM parameters to each Kafka broker (see here for more details):
        -Djava.security.krb5.conf=/etc/kafka/krb5.conf
        -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
      3. Make sure the keytabs configured in the JAAS file are readable by the operating system user who is starting kafka broker.
      4. Configure SASL port and SASL mechanisms in server.properties as described here. For example:
        listeners=SASL_PLAINTEXT://host.name:port
        security.inter.broker.protocol=SASL_PLAINTEXT
        sasl.mechanism.inter.broker.protocol=GSSAPI
        sasl.enabled.mechanisms=GSSAPI
        We must also configure the service name in server.properties, which should match the principal name of the kafka brokers. In the above example, principal is "kafka/kafka1.hostname.com@EXAMPLE.com", so:
        sasl.kerberos.service.name=kafka
    3. Configuring Kafka Clients
      To configure SASL authentication on the clients:
      1. Clients (producers, consumers, connect workers, etc) will authenticate to the cluster with their own principal (usually with the same name as the user running the client), so obtain or create these principals as needed. Then configure the JAAS configuration property for each client. Different clients within a JVM may run as different users by specifying different principals. The property sasl.jaas.config in producer.properties or consumer.properties describes how clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client using a keytab (recommended for long-running processes):
        sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
            useKeyTab=true \
            storeKey=true  \
            keyTab="/etc/security/keytabs/kafka_client.keytab" \
            principal="kafka-client-1@EXAMPLE.COM";
        For command-line utilities like kafka-console-consumer or kafka-console-producer, kinit can be used along with "useTicketCache=true" as in:
        sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
            useTicketCache=true;
        JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.
      2. Make sure the keytabs configured in the JAAS configuration are readable by the operating system user who is starting kafka client.
      3. Optionally pass the krb5 file locations as JVM parameters to each client JVM (see here for more details):
        -Djava.security.krb5.conf=/etc/kafka/krb5.conf
      4. Configure the following properties in producer.properties or consumer.properties:
        security.protocol=SASL_PLAINTEXT (or SASL_SSL)
        sasl.mechanism=GSSAPI
        sasl.kerberos.service.name=kafka
  • Authentication using SASL/PLAIN

    SASL/PLAIN is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication. Kafka supports a default implementation for SASL/PLAIN which can be extended for production use as described here.

    Under the default implementation of principal.builder.class, the username is used as the authenticated Principal for configuration of ACLs etc.
    1. Configuring Kafka Brokers
      1. Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
        KafkaServer {
            org.apache.kafka.common.security.plain.PlainLoginModule required
            username="admin"
            password="admin-secret"
            user_admin="admin-secret"
            user_alice="alice-secret";
        };
        This configuration defines two users (admin and alice). The properties username and password in the KafkaServer section are used by the broker to initiate connections to other brokers. In this example, admin is the user for inter-broker communication. The set of properties user_userName defines the passwords for all users that connect to the broker and the broker validates all client connections including those from other brokers using these properties.
      2. Pass the JAAS config file location as JVM parameter to each Kafka broker:
        -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
      3. Configure SASL port and SASL mechanisms in server.properties as described here. For example:
        listeners=SASL_SSL://host.name:port
        security.inter.broker.protocol=SASL_SSL
        sasl.mechanism.inter.broker.protocol=PLAIN
        sasl.enabled.mechanisms=PLAIN
    2. Configuring Kafka Clients
      To configure SASL authentication on the clients:
      1. Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the PLAIN mechanism:
        sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
            username="alice" \
            password="alice-secret";

        The options username and password are used by clients to configure the user for client connections. In this example, clients connect to the broker as user alice. Different clients within a JVM may connect as different users by specifying different user names and passwords in sasl.jaas.config.

        JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.

      2. Configure the following properties in producer.properties or consumer.properties:
        security.protocol=SASL_SSL
        sasl.mechanism=PLAIN
    3. Use of SASL/PLAIN in production
      • SASL/PLAIN should be used only with SSL as transport layer to ensure that clear passwords are not transmitted on the wire without encryption.
      • The default implementation of SASL/PLAIN in Kafka specifies usernames and passwords in the JAAS configuration file as shown here. From Kafka version 2.0 onwards, you can avoid storing clear passwords on disk by configuring your own callback handlers that obtain username and password from an external source using the configuration options sasl.server.callback.handler.class and sasl.client.callback.handler.class.
      • In production systems, external authentication servers may implement password authentication. From Kafka version 2.0 onwards, you can plug in your own callback handlers that use external authentication servers for password verification by configuring sasl.server.callback.handler.class.
  • Authentication using SASL/SCRAM

    Salted Challenge Response Authentication Mechanism (SCRAM) is a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username/password authentication like PLAIN and DIGEST-MD5. The mechanism is defined in RFC 5802. Kafka supports SCRAM-SHA-256 and SCRAM-SHA-512 which can be used with TLS to perform secure authentication. Under the default implementation of principal.builder.class, the username is used as the authenticated Principal for configuration of ACLs etc. The default SCRAM implementation in Kafka stores SCRAM credentials in Zookeeper and is suitable for use in Kafka installations where Zookeeper is on a private network. Refer to Security Considerations for more details.

    1. Creating SCRAM Credentials

      The SCRAM implementation in Kafka uses Zookeeper as credential store. Credentials can be created in Zookeeper using kafka-configs.sh. For each SCRAM mechanism enabled, credentials must be created by adding a config with the mechanism name. Credentials for inter-broker communication must be created before Kafka brokers are started. Client credentials may be created and updated dynamically and updated credentials will be used to authenticate new connections.

      Create SCRAM credentials for user alice with password alice-secret:

      > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice

      The default iteration count of 4096 is used if iterations are not specified. A random salt is created and the SCRAM identity consisting of salt, iterations, StoredKey and ServerKey are stored in Zookeeper. See RFC 5802 for details on SCRAM identity and the individual fields.

      The following examples also require a user admin for inter-broker communication which can be created using:

      > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin

      Existing credentials may be listed using the --describe option:

      > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --describe --entity-type users --entity-name alice

      Credentials may be deleted for one or more SCRAM mechanisms using the --alter --delete-config option:

      > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice
    2. Configuring Kafka Brokers
      1. Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
        KafkaServer {
            org.apache.kafka.common.security.scram.ScramLoginModule required
            username="admin"
            password="admin-secret";
        };
        The properties username and password in the KafkaServer section are used by the broker to initiate connections to other brokers. In this example, admin is the user for inter-broker communication.
      2. Pass the JAAS config file location as JVM parameter to each Kafka broker:
        -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
      3. Configure SASL port and SASL mechanisms in server.properties as described here. For example:
        listeners=SASL_SSL://host.name:port
        security.inter.broker.protocol=SASL_SSL
        sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 (or SCRAM-SHA-512)
        sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)
    3. Configuring Kafka Clients
      To configure SASL authentication on the clients:
      1. Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the SCRAM mechanisms:
        sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
            username="alice" \
            password="alice-secret";

        The options username and password are used by clients to configure the user for client connections. In this example, clients connect to the broker as user alice. Different clients within a JVM may connect as different users by specifying different user names and passwords in sasl.jaas.config.

        JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.

      2. Configure the following properties in producer.properties or consumer.properties:
        security.protocol=SASL_SSL
        sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)
    4. Security Considerations for SASL/SCRAM
      • The default implementation of SASL/SCRAM in Kafka stores SCRAM credentials in Zookeeper. This is suitable for production use in installations where Zookeeper is secure and on a private network.
      • Kafka supports only the strong hash functions SHA-256 and SHA-512 with a minimum iteration count of 4096. Strong hash functions combined with strong passwords and high iteration counts protect against brute force attacks if Zookeeper security is compromised.
      • SCRAM should be used only with TLS-encryption to prevent interception of SCRAM exchanges. This protects against dictionary or brute force attacks and against impersonation if Zookeeper is compromised.
      • From Kafka version 2.0 onwards, the default SASL/SCRAM credential store may be overridden using custom callback handlers by configuring sasl.server.callback.handler.class in installations where Zookeeper is not secure.
      • For more details on security considerations, refer to RFC 5802.
  • Authentication using SASL/OAUTHBEARER

    The OAuth 2 Authorization Framework "enables a third-party application to obtain limited access to an HTTP service, either on behalf of a resource owner by orchestrating an approval interaction between the resource owner and the HTTP service, or by allowing the third-party application to obtain access on its own behalf." The SASL OAUTHBEARER mechanism enables the use of the framework in a SASL (i.e. a non-HTTP) context; it is defined in RFC 7628. The default OAUTHBEARER implementation in Kafka creates and validates Unsecured JSON Web Tokens and is only suitable for use in non-production Kafka installations. Refer to Security Considerations for more details.

    Under the default implementation of principal.builder.class, the principalName of OAuthBearerToken is used as the authenticated Principal for configuration of ACLs etc.
    1. Configuring Kafka Brokers
      1. Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
        KafkaServer {
            org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
            unsecuredLoginStringClaim_sub="admin";
        };
        The property unsecuredLoginStringClaim_sub in the KafkaServer section is used by the broker when it initiates connections to other brokers. In this example, admin will appear in the subject (sub) claim and will be the user for inter-broker communication.
      2. Pass the JAAS config file location as JVM parameter to each Kafka broker:
        -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
      3. Configure SASL port and SASL mechanisms in server.properties as described here. For example:
        listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
        security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
        sasl.mechanism.inter.broker.protocol=OAUTHBEARER
        sasl.enabled.mechanisms=OAUTHBEARER
    2. Configuring Kafka Clients
      To configure SASL authentication on the clients:
      1. Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the OAUTHBEARER mechanisms:
        sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
            unsecuredLoginStringClaim_sub="alice";

        The option unsecuredLoginStringClaim_sub is used by clients to configure the subject (sub) claim, which determines the user for client connections. In this example, clients connect to the broker as user alice. Different clients within a JVM may connect as different users by specifying different subject (sub) claims in sasl.jaas.config.

        JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.

      2. Configure the following properties in producer.properties or consumer.properties:
        security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
        sasl.mechanism=OAUTHBEARER
      3. The default implementation of SASL/OAUTHBEARER depends on the jackson-databind library. Since it's an optional dependency, users have to configure it as a dependency via their build tool.
    3. Unsecured Token Creation Options for SASL/OAUTHBEARER
      • The default implementation of SASL/OAUTHBEARER in Kafka creates and validates Unsecured JSON Web Tokens. While suitable only for non-production use, it does provide the flexibility to create arbitrary tokens in a DEV or TEST environment.
      • Here are the various supported JAAS module options on the client side (and on the broker side if OAUTHBEARER is the inter-broker protocol):
  • Protocol (API key) Operation Resource Note
    PRODUCE (0) Write TransactionalId An transactional producer which has its transactional.id set requires this privilege.
    PRODUCE (0) IdempotentWrite Cluster An idempotent produce action requires this privilege.
    PRODUCE (0) Write Topic This applies to a normal produce action.
    FETCH (1) ClusterAction Cluster A follower must have ClusterAction on the Cluster resource in order to fetch partition data.
    FETCH (1) Read Topic Regular Kafka consumers need READ permission on each partition they are fetching.
    LIST_OFFSETS (2) Describe Topic
    METADATA (3) Describe Topic
    METADATA (3) Create Cluster If topic auto-creation is enabled, then the broker-side API will check for the existence of a Cluster level privilege. If it's found then it'll allow creating the topic, otherwise it'll iterate through the Topic level privileges (see the next one).
    METADATA (3) Create Topic This authorizes auto topic creation if enabled but the given user doesn't have a cluster level permission (above).
    LEADER_AND_ISR (4) ClusterAction Cluster
    STOP_REPLICA (5) ClusterAction Cluster
    UPDATE_METADATA (6) ClusterAction Cluster
    CONTROLLED_SHUTDOWN (7) ClusterAction Cluster
    OFFSET_COMMIT (8) Read Group An offset can only be committed if it's authorized to the given group and the topic too (see below). Group access is checked first, then Topic access.
    OFFSET_COMMIT (8) Read Topic Since offset commit is part of the consuming process, it needs privileges for the read action.
    OFFSET_FETCH (9) Describe Group Similarly to OFFSET_COMMIT, the application must have privileges on group and topic level too to be able to fetch. However in this case it requires describe access instead of read. Group access is checked first, then Topic access.
    OFFSET_FETCH (9) Describe Topic
    FIND_COORDINATOR (10) Describe Group The FIND_COORDINATOR request can be of "Group" type in which case it is looking for consumergroup coordinators. This privilege would represent the Group mode.
    FIND_COORDINATOR (10) Describe TransactionalId This applies only on transactional producers and checked when a producer tries to find the transaction coordinator.
    JOIN_GROUP (11) Read Group
    HEARTBEAT (12) Read Group
    LEAVE_GROUP (13) Read Group
    SYNC_GROUP (14) Read Group
    DESCRIBE_GROUPS (15) Describe Group
    LIST_GROUPS (16) Describe Cluster When the broker checks to authorize a list_groups request it first checks for this cluster level authorization. If none found then it proceeds to check the groups individually. This operation doesn't return CLUSTER_AUTHORIZATION_FAILED.
    LIST_GROUPS (16) Describe Group If none of the groups are authorized, then just an empty response will be sent back instead of an error. This operation doesn't return CLUSTER_AUTHORIZATION_FAILED. This is applicable from the 2.1 release.
    SASL_HANDSHAKE (17) The SASL handshake is part of the authentication process and therefore it's not possible to apply any kind of authorization here.
    API_VERSIONS (18) The API_VERSIONS request is part of the Kafka protocol handshake and happens on connection and before any authentication. Therefore it's not possible to control this with authorization.
    CREATE_TOPICS (19) Create Cluster If there is no cluster level authorization then it won't return CLUSTER_AUTHORIZATION_FAILED but fall back to use topic level, which is just below. That'll throw error if there is a problem.
    CREATE_TOPICS (19) Create Topic This is applicable from the 2.0 release.
    DELETE_TOPICS (20) Delete Topic
    DELETE_RECORDS (21) Delete Topic
    INIT_PRODUCER_ID (22) Write TransactionalId
    INIT_PRODUCER_ID (22) IdempotentWrite Cluster
    OFFSET_FOR_LEADER_EPOCH (23) ClusterAction Cluster If there is no cluster level privilege for this operation, then it'll check for topic level one.
    OFFSET_FOR_LEADER_EPOCH (23) Describe Topic This is applicable from the 2.1 release.
    ADD_PARTITIONS_TO_TXN (24) Write TransactionalId This API is only applicable to transactional requests. It first checks for the Write action on the TransactionalId resource, then it checks the Topic in subject (below).
    ADD_PARTITIONS_TO_TXN (24) Write Topic
    ADD_OFFSETS_TO_TXN (25) Write TransactionalId Similarly to ADD_PARTITIONS_TO_TXN this is only applicable to transactional request. It first checks for Write action on the TransactionalId resource, then it checks whether it can Read on the given group (below).
    ADD_OFFSETS_TO_TXN (25) Read Group
    END_TXN (26) Write TransactionalId
    WRITE_TXN_MARKERS (27) ClusterAction Cluster
    TXN_OFFSET_COMMIT (28) Write TransactionalId
    TXN_OFFSET_COMMIT (28) Read Group
    TXN_OFFSET_COMMIT (28) Read Topic
    DESCRIBE_ACLS (29) Describe Cluster
    CREATE_ACLS (30) Alter Cluster
    DELETE_ACLS (31) Alter Cluster
    DESCRIBE_CONFIGS (32) DescribeConfigs Cluster If broker configs are requested, then the broker will check cluster level privileges.
    DESCRIBE_CONFIGS (32) DescribeConfigs Topic If topic configs are requested, then the broker will check topic level privileges.
    ALTER_CONFIGS (33) AlterConfigs Cluster If broker configs are altered, then the broker will check cluster level privileges.
    ALTER_CONFIGS (33) AlterConfigs Topic If topic configs are altered, then the broker will check topic level privileges.
    ALTER_REPLICA_LOG_DIRS (34) Alter Cluster
    DESCRIBE_LOG_DIRS (35) Describe Cluster An empty response will be returned on authorization failure.
    SASL_AUTHENTICATE (36) SASL_AUTHENTICATE is part of the authentication process and therefore it's not possible to apply any kind of authorization here.
    CREATE_PARTITIONS (37) Alter Topic
    CREATE_DELEGATION_TOKEN (38) Creating delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section.
    CREATE_DELEGATION_TOKEN (38) CreateTokens User Allows creating delegation tokens for the User resource.
    RENEW_DELEGATION_TOKEN (39) Renewing delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section.
    EXPIRE_DELEGATION_TOKEN (40) Expiring delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section.
    DESCRIBE_DELEGATION_TOKEN (41) Describe DelegationToken Describing delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section.
    DESCRIBE_DELEGATION_TOKEN (41) DescribeTokens User Allows describing delegation tokens of the User resource.
    DELETE_GROUPS (42) Delete Group
    ELECT_PREFERRED_LEADERS (43) ClusterAction Cluster
    INCREMENTAL_ALTER_CONFIGS (44) AlterConfigs Cluster If broker configs are altered, then the broker will check cluster level privileges.
    INCREMENTAL_ALTER_CONFIGS (44) AlterConfigs Topic If topic configs are altered, then the broker will check topic level privileges.
    ALTER_PARTITION_REASSIGNMENTS (45) Alter Cluster
    LIST_PARTITION_REASSIGNMENTS (46) Describe Cluster
    OFFSET_DELETE (47) Delete Group
    OFFSET_DELETE (47) Read Topic
    JAAS Module Option for Unsecured Token Creation Documentation
    unsecuredLoginStringClaim_<claimname>="value" Creates a String claim with the given name and value. Any valid claim name can be specified except 'iat' and 'exp' (these are automatically generated).
    unsecuredLoginNumberClaim_<claimname>="value" Creates a Number claim with the given name and value. Any valid claim name can be specified except 'iat' and 'exp' (these are automatically generated).
    unsecuredLoginListClaim_<claimname>="value" Creates a String List claim with the given name and values parsed from the given value where the first character is taken as the delimiter. For example: unsecuredLoginListClaim_fubar="|value1|value2". Any valid claim name can be specified except 'iat' and 'exp' (these are automatically generated).
    unsecuredLoginExtension_<extensionname>="value" Creates a String extension with the given name and value. For example: unsecuredLoginExtension_traceId="123". A valid extension name is any sequence of lowercase or uppercase alphabet characters. In addition, the "auth" extension name is reserved. A valid extension value is any combination of characters with ASCII codes 1-127.
    unsecuredLoginPrincipalClaimName Set to a custom claim name if you wish the name of the String claim holding the principal name to be something other than 'sub'.
    unsecuredLoginLifetimeSeconds Set to an integer value if the token expiration is to be set to something other than the default value of 3600 seconds (which is 1 hour). The 'exp' claim will be set to reflect the expiration time.
    unsecuredLoginScopeClaimName Set to a custom claim name if you wish the name of the String or String List claim holding any token scope to be something other than 'scope'.
  • Unsecured Token Validation Options for SASL/OAUTHBEARER
  • Token Refresh for SASL/OAUTHBEARER
    Kafka periodically refreshes any token before it expires so that the client can continue to make connections to brokers. The parameters that impact how the refresh algorithm operates are specified as part of the producer/consumer/broker configuration and are as follows. See the documentation for these properties elsewhere for details. The default values are usually reasonable, in which case these configuration parameters would not need to be explicitly set.
    Producer/Consumer/Broker Configuration Property
    sasl.login.refresh.window.factor
    sasl.login.refresh.window.jitter
    sasl.login.refresh.min.period.seconds
    sasl.login.refresh.min.buffer.seconds
  • Secure/Production Use of SASL/OAUTHBEARER
    Production use cases will require writing an implementation of org.apache.kafka.common.security.auth.AuthenticateCallbackHandler that can handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback and declaring it via either the sasl.login.callback.handler.class configuration option for a non-broker client or via the listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class configuration option for brokers (when SASL/OAUTHBEARER is the inter-broker protocol).

    Production use cases will also require writing an implementation of org.apache.kafka.common.security.auth.AuthenticateCallbackHandler that can handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback and declaring it via the listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class broker configuration option.

  • Security Considerations for SASL/OAUTHBEARER
  • Enabling multiple SASL mechanisms in a broker

    1. Specify configuration for the login modules of all enabled mechanisms in the KafkaServer section of the JAAS config file. For example:
      KafkaServer {
          com.sun.security.auth.module.Krb5LoginModule required
          useKeyTab=true
          storeKey=true
          keyTab="/etc/security/keytabs/kafka_server.keytab"
          principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
      
          org.apache.kafka.common.security.plain.PlainLoginModule required
          username="admin"
          password="admin-secret"
          user_admin="admin-secret"
          user_alice="alice-secret";
      };
    2. Enable the SASL mechanisms in server.properties:
      sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER
    3. Specify the SASL security protocol and mechanism for inter-broker communication in server.properties if required:
      security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
      sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechanisms)
    4. Follow the mechanism-specific steps in GSSAPI (Kerberos), PLAIN, SCRAM and OAUTHBEARER to configure SASL for the enabled mechanisms.
  • Modifying SASL mechanism in a Running Cluster

    SASL mechanism can be modified in a running cluster using the following sequence:

    1. Enable new SASL mechanism by adding the mechanism to sasl.enabled.mechanisms in server.properties for each broker. Update JAAS config file to include both mechanisms as described here. Incrementally bounce the cluster nodes.
    2. Restart clients using the new mechanism.
    3. To change the mechanism of inter-broker communication (if this is required), set sasl.mechanism.inter.broker.protocol in server.properties to the new mechanism and incrementally bounce the cluster again.
    4. To remove old mechanism (if this is required), remove the old mechanism from sasl.enabled.mechanisms in server.properties and remove the entries for the old mechanism from JAAS config file. Incrementally bounce the cluster again.
  • Authentication using Delegation Tokens

    Delegation token based authentication is a lightweight authentication mechanism to complement existing SASL/SSL methods. Delegation tokens are shared secrets between kafka brokers and clients. Delegation tokens will help processing frameworks to distribute the workload to available workers in a secure environment without the added cost of distributing Kerberos TGT/keytabs or keystores when 2-way SSL is used. See KIP-48 for more details.

    Under the default implementation of principal.builder.class, the owner of delegation token is used as the authenticated Principal for configuration of ACLs etc.

    Typical steps for delegation token usage are:

    1. User authenticates with the Kafka cluster via SASL or SSL, and obtains a delegation token. This can be done using Admin APIs or using kafka-delegation-tokens.sh script.
    2. User securely passes the delegation token to Kafka clients for authenticating with the Kafka cluster.
    3. Token owner/renewer can renew/expire the delegation tokens.
    1. Token Management

      A secret is used to generate and verify delegation tokens. This is supplied using config option delegation.token.secret.key. The same secret key must be configured across all the brokers. If the secret is not set or set to empty string, brokers will disable the delegation token authentication.

      In the current implementation, token details are stored in Zookeeper and is suitable for use in Kafka installations where Zookeeper is on a private network. Also currently, this secret is stored as plain text in the server.properties config file. We intend to make these configurable in a future Kafka release.

      A token has a current life, and a maximum renewable life. By default, tokens must be renewed once every 24 hours for up to 7 days. These can be configured using delegation.token.expiry.time.ms and delegation.token.max.lifetime.ms config options.

      Tokens can also be cancelled explicitly. If a token is not renewed by the token’s expiration time or if token is beyond the max life time, it will be deleted from all broker caches as well as from zookeeper.

    2. Creating Delegation Tokens

      Tokens can be created by using Admin APIs or using kafka-delegation-tokens.sh script. Delegation token requests (create/renew/expire/describe) should be issued only on SASL or SSL authenticated channels. Tokens can not be requests if the initial authentication is done through delegation token. A token can be created by the user for that user or others as well by specifying the --owner-principal parameter. Owner/Renewers can renew or expire tokens. Owner/renewers can always describe their own tokens. To describe other tokens, a DESCRIBE_TOKEN permission needs to be added on the User resource representing the owner of the token. kafka-delegation-tokens.sh script examples are given below.

      Create a delegation token:

      > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create   --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1

      Create a delegation token for a different owner:

      > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create   --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1 --owner-principal User:owner1

      Renew a delegation token:

      > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew    --renew-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK

      Expire a delegation token:

      > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire   --expiry-time-period -1   --command-config client.properties  --hmac ABCDEFGHIJK

      Existing tokens can be described using the --describe option:

      > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe --command-config client.properties  --owner-principal User:user1
    3. Token Authentication

      Delegation token authentication piggybacks on the current SASL/SCRAM authentication mechanism. We must enable SASL/SCRAM mechanism on Kafka cluster as described in here.

      Configuring Kafka Clients:

      1. Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the token authentication:
        sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
            username="tokenID123" \
            password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
            tokenauth="true";

        The options username and password are used by clients to configure the token id and token HMAC. And the option tokenauth is used to indicate the server about token authentication. In this example, clients connect to the broker using token id: tokenID123. Different clients within a JVM may connect using different tokens by specifying different token details in sasl.jaas.config.

        JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.

    4. Procedure to manually rotate the secret:

      We require a re-deployment when the secret needs to be rotated. During this process, already connected clients will continue to work. But any new connection requests and renew/expire requests with old tokens can fail. Steps are given below.

      1. Expire all existing tokens.
      2. Rotate the secret by rolling upgrade, and
      3. Generate new tokens

      We intend to automate this in a future Kafka release.

  • 7.5 Authorization and ACLs

    Kafka ships with a pluggable authorization framework, which is configured with the authorizer.class.name property in the server confgiuration. Configured implementations must extend org.apache.kafka.server.authorizer.Authorizer. Kafka provides default implementations which store ACLs in the cluster metadata (either Zookeeper or the KRaft metadata log). For Zookeeper-based clusters, the provided implementation is configured as follows:
    authorizer.class.name=kafka.security.authorizer.AclAuthorizer
    For KRaft clusters, use the following configuration on all nodes (brokers, controllers, or combined broker/controller nodes):
    authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
    Kafka ACLs are defined in the general format of "Principal {P} is [Allowed|Denied] Operation {O} From Host {H} on any Resource {R} matching ResourcePattern {RP}". You can read more about the ACL structure in KIP-11 and resource patterns in KIP-290. In order to add, remove, or list ACLs, you can use the Kafka ACL CLI kafka-acls.sh. By default, if no ResourcePatterns match a specific Resource R, then R has no associated ACLs, and therefore no one other than super users is allowed to access R. If you want to change that behavior, you can include the following in server.properties.
    allow.everyone.if.no.acl.found=true
    One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string "User" is case sensitive.
    super.users=User:Bob;User:Alice
    KRaft Principal Forwarding
    In KRaft clusters, admin requests such as CreateTopics and DeleteTopics are sent to the broker listeners by the client. The broker then forwards the request to the active controller through the first listener configured in controller.listener.names. Authorization of these requests is done on the controller node. This is achieved by way of an Envelope request which packages both the underlying request from the client as well as the client principal. When the controller receives the forwarded Envelope request from the broker, it first authorizes the Envelope request using the authenticated broker principal. Then it authorizes the underlying request using the forwarded principal.
    All of this implies that Kafka must understand how to serialize and deserialize the client principal. The authentication framework allows for customized principals by overriding the principal.builder.class configuration. In order for customized principals to work with KRaft, the configured class must implement org.apache.kafka.common.security.auth.KafkaPrincipalSerde so that Kafka knows how to serialize and deserialize the principals. The default implementation org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder uses the Kafka RPC format defined in the source code: clients/src/main/resources/common/message/DefaultPrincipalData.json. For more detail about request forwarding in KRaft, see KIP-590
    Customizing SSL User Name
    By default, the SSL user name will be of the form "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can change that by setting ssl.principal.mapping.rules to a customized rule in server.properties. This config allows a list of rules for mapping X.500 distinguished name to short name. The rules are evaluated in order and the first rule that matches a distinguished name is used to map it to a short name. Any later rules in the list are ignored.
    The format of ssl.principal.mapping.rules is a list where each rule starts with "RULE:" and contains an expression as the following formats. Default rule will return string representation of the X.500 certificate distinguished name. If the distinguished name matches the pattern, then the replacement command will be run over the name. This also supports lowercase/uppercase options, to force the translated result to be all lower/uppercase case. This is done by adding a "/L" or "/U' to the end of the rule.
    RULE:pattern/replacement/
    RULE:pattern/replacement/[LU]
    Example ssl.principal.mapping.rules values are:
    RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
    RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
    RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
    DEFAULT
    Above rules translate distinguished name "CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "serviceuser" and "CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "adminuser@admin".
    For advanced use cases, one can customize the name by setting a customized PrincipalBuilder in server.properties like the following.
    principal.builder.class=CustomizedPrincipalBuilderClass
    Customizing SASL User Name
    By default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting sasl.kerberos.principal.to.local.rules to a customized rule in server.properties. The format of sasl.kerberos.principal.to.local.rules is a list where each rule works in the same way as the auth_to_local in Kerberos configuration file (krb5.conf). This also support additional lowercase/uppercase rule, to force the translated result to be all lowercase/uppercase. This is done by adding a "/L" or "/U" to the end of the rule. check below formats for syntax. Each rules starts with RULE: and contains an expression as the following formats. See the kerberos documentation for more details.
    RULE:[n:string](regexp)s/pattern/replacement/
    RULE:[n:string](regexp)s/pattern/replacement/g
    RULE:[n:string](regexp)s/pattern/replacement//L
    RULE:[n:string](regexp)s/pattern/replacement/g/L
    RULE:[n:string](regexp)s/pattern/replacement//U
    RULE:[n:string](regexp)s/pattern/replacement/g/U
    An example of adding a rule to properly translate user@MYDOMAIN.COM to user while also keeping the default rule in place is:
    sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT

    Command Line Interface

    Kafka Authorization management CLI can be found under bin directory with all the other CLIs. The CLI script is called kafka-acls.sh. Following lists all the options that the script supports:

    Option Description Default Option type
    --add Indicates to the script that user is trying to add an acl. Action
    --remove Indicates to the script that user is trying to remove an acl. Action
    --list Indicates to the script that user is trying to list acls. Action
    --bootstrap-server A list of host/port pairs to use for establishing the connection to the Kafka cluster. Only one of --bootstrap-server or --authorizer option must be specified. Configuration
    --command-config A property file containing configs to be passed to Admin Client. This option can only be used with --bootstrap-server option. Configuration
    --cluster Indicates to the script that the user is trying to interact with acls on the singular cluster resource. ResourcePattern
    --topic [topic-name] Indicates to the script that the user is trying to interact with acls on topic resource pattern(s). ResourcePattern
    --group [group-name] Indicates to the script that the user is trying to interact with acls on consumer-group resource pattern(s) ResourcePattern
    --transactional-id [transactional-id] The transactionalId to which ACLs should be added or removed. A value of * indicates the ACLs should apply to all transactionalIds. ResourcePattern
    --delegation-token [delegation-token] Delegation token to which ACLs should be added or removed. A value of * indicates ACL should apply to all tokens. ResourcePattern
    --user-principal [user-principal] A user resource to which ACLs should be added or removed. This is currently supported in relation with delegation tokens. A value of * indicates ACL should apply to all users. ResourcePattern
    --resource-pattern-type [pattern-type] Indicates to the script the type of resource pattern, (for --add), or resource pattern filter, (for --list and --remove), the user wishes to use.
    When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'.
    When listing or removing acls, a specific pattern type filter can be used to list or remove acls from a specific type of resource pattern, or the filter values of 'any' or 'match' can be used, where 'any' will match any pattern type, but will match the resource name exactly, and 'match' will perform pattern matching to list or remove all acls that affect the supplied resource(s).
    WARNING: 'match', when used in combination with the '--remove' switch, should be used with care.
    literal Configuration
    --allow-principal Principal is in PrincipalType:name format that will be added to ACL with Allow permission. Default PrincipalType string "User" is case sensitive.
    You can specify multiple --allow-principal in a single command.
    Principal
    --deny-principal Principal is in PrincipalType:name format that will be added to ACL with Deny permission. Default PrincipalType string "User" is case sensitive.
    You can specify multiple --deny-principal in a single command.
    Principal
    --principal Principal is in PrincipalType:name format that will be used along with --list option. Default PrincipalType string "User" is case sensitive. This will list the ACLs for the specified principal.
    You can specify multiple --principal in a single command.
    Principal
    --allow-host IP address from which principals listed in --allow-principal will have access. if --allow-principal is specified defaults to * which translates to "all hosts" Host
    --deny-host IP address from which principals listed in --deny-principal will be denied access. if --deny-principal is specified defaults to * which translates to "all hosts" Host
    --operation Operation that will be allowed or denied.
    Valid values are:
    • Read
    • Write
    • Create
    • Delete
    • Alter
    • Describe
    • ClusterAction
    • DescribeConfigs
    • AlterConfigs
    • IdempotentWrite
    • CreateTokens
    • DescribeTokens
    • All
    All Operation
    --producer Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE and CREATE on topic. Convenience
    --consumer Convenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group. Convenience
    --idempotent Enable idempotence for the producer. This should be used in combination with the --producer option.
    Note that idempotence is enabled automatically if the producer is authorized to a particular transactional-id.
    Convenience
    --force Convenience option to assume yes to all queries and do not prompt. Convenience
    --authorizer (DEPRECATED: not supported in KRaft) Fully qualified class name of the authorizer. kafka.security.authorizer.AclAuthorizer Configuration
    --authorizer-properties (DEPRECATED: not supported in KRaft) key=val pairs that will be passed to authorizer for initialization. For the default authorizer in ZK clsuters, the example values are: zookeeper.connect=localhost:2181 Configuration
    --zk-tls-config-file (DEPRECATED: not supported in KRaft) Identifies the file where ZooKeeper client TLS connectivity properties for the authorizer are defined. Any properties other than the following (with or without an "authorizer." prefix) are ignored: zookeeper.clientCnxnSocket, zookeeper.ssl.cipher.suites, zookeeper.ssl.client.enable, zookeeper.ssl.crl.enable, zookeeper.ssl.enabled.protocols, zookeeper.ssl.endpoint.identification.algorithm, zookeeper.ssl.keystore.location, zookeeper.ssl.keystore.password, zookeeper.ssl.keystore.type, zookeeper.ssl.ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location, zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type Configuration

    Examples

    Authorization Primitives

    Protocol calls are usually performing some operations on certain resources in Kafka. It is required to know the operations and resources to set up effective protection. In this section we'll list these operations and resources, then list the combination of these with the protocols to see the valid scenarios.

    Operations in Kafka

    There are a few operation primitives that can be used to build up privileges. These can be matched up with certain resources to allow specific protocol calls for a given user. These are:

    Resources in Kafka

    The operations above can be applied on certain resources which are described below.

    Operations and Resources on Protocols

    In the below table we'll list the valid operations on resources that are executed by the Kafka API protocols.

    - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    Protocol (API key) Operation Resource Note
    PRODUCE (0) Write TransactionalId An transactional producer which has its transactional.id set requires this privilege.
    PRODUCE (0) IdempotentWrite Cluster An idempotent produce action requires this privilege.
    PRODUCE (0) Write Topic This applies to a normal produce action.
    FETCH (1) ClusterAction Cluster A follower must have ClusterAction on the Cluster resource in order to fetch partition data.
    FETCH (1) Read Topic Regular Kafka consumers need READ permission on each partition they are fetching.
    LIST_OFFSETS (2) Describe Topic
    METADATA (3) Describe Topic
    METADATA (3) Create Cluster If topic auto-creation is enabled, then the broker-side API will check for the existence of a Cluster level privilege. If it's found then it'll allow creating the topic, otherwise it'll iterate through the Topic level privileges (see the next one).
    METADATA (3) Create Topic This authorizes auto topic creation if enabled but the given user doesn't have a cluster level permission (above).
    LEADER_AND_ISR (4) ClusterAction Cluster
    STOP_REPLICA (5) ClusterAction Cluster
    UPDATE_METADATA (6) ClusterAction Cluster
    CONTROLLED_SHUTDOWN (7) ClusterAction Cluster
    OFFSET_COMMIT (8) Read Group An offset can only be committed if it's authorized to the given group and the topic too (see below). Group access is checked first, then Topic access.
    OFFSET_COMMIT (8) Read Topic Since offset commit is part of the consuming process, it needs privileges for the read action.
    OFFSET_FETCH (9) Describe Group Similarly to OFFSET_COMMIT, the application must have privileges on group and topic level too to be able to fetch. However in this case it requires describe access instead of read. Group access is checked first, then Topic access.
    OFFSET_FETCH (9) Describe Topic
    FIND_COORDINATOR (10) Describe Group The FIND_COORDINATOR request can be of "Group" type in which case it is looking for consumergroup coordinators. This privilege would represent the Group mode.
    FIND_COORDINATOR (10) Describe TransactionalId This applies only on transactional producers and checked when a producer tries to find the transaction coordinator.
    JOIN_GROUP (11) Read Group
    HEARTBEAT (12) Read Group
    LEAVE_GROUP (13) Read Group
    SYNC_GROUP (14) Read Group
    DESCRIBE_GROUPS (15) Describe Group
    LIST_GROUPS (16) Describe Cluster When the broker checks to authorize a list_groups request it first checks for this cluster level authorization. If none found then it proceeds to check the groups individually. This operation doesn't return CLUSTER_AUTHORIZATION_FAILED.
    LIST_GROUPS (16) Describe Group If none of the groups are authorized, then just an empty response will be sent back instead of an error. This operation doesn't return CLUSTER_AUTHORIZATION_FAILED. This is applicable from the 2.1 release.
    SASL_HANDSHAKE (17) The SASL handshake is part of the authentication process and therefore it's not possible to apply any kind of authorization here.
    API_VERSIONS (18) The API_VERSIONS request is part of the Kafka protocol handshake and happens on connection and before any authentication. Therefore it's not possible to control this with authorization.
    CREATE_TOPICS (19) Create Cluster If there is no cluster level authorization then it won't return CLUSTER_AUTHORIZATION_FAILED but fall back to use topic level, which is just below. That'll throw error if there is a problem.
    CREATE_TOPICS (19) Create Topic This is applicable from the 2.0 release.
    DELETE_TOPICS (20) Delete Topic
    DELETE_RECORDS (21) Delete Topic
    INIT_PRODUCER_ID (22) Write TransactionalId
    INIT_PRODUCER_ID (22) IdempotentWrite Cluster
    OFFSET_FOR_LEADER_EPOCH (23) ClusterAction Cluster If there is no cluster level privilege for this operation, then it'll check for topic level one.
    OFFSET_FOR_LEADER_EPOCH (23) Describe Topic This is applicable from the 2.1 release.
    ADD_PARTITIONS_TO_TXN (24) Write TransactionalId This API is only applicable to transactional requests. It first checks for the Write action on the TransactionalId resource, then it checks the Topic in subject (below).
    ADD_PARTITIONS_TO_TXN (24) Write Topic
    ADD_OFFSETS_TO_TXN (25) Write TransactionalId Similarly to ADD_PARTITIONS_TO_TXN this is only applicable to transactional request. It first checks for Write action on the TransactionalId resource, then it checks whether it can Read on the given group (below).
    ADD_OFFSETS_TO_TXN (25) Read Group
    END_TXN (26) Write TransactionalId
    WRITE_TXN_MARKERS (27) ClusterAction Cluster
    TXN_OFFSET_COMMIT (28) Write TransactionalId
    TXN_OFFSET_COMMIT (28) Read Group
    TXN_OFFSET_COMMIT (28) Read Topic
    DESCRIBE_ACLS (29) Describe Cluster
    CREATE_ACLS (30) Alter Cluster
    DELETE_ACLS (31) Alter Cluster
    DESCRIBE_CONFIGS (32) DescribeConfigs Cluster If broker configs are requested, then the broker will check cluster level privileges.
    DESCRIBE_CONFIGS (32) DescribeConfigs Topic If topic configs are requested, then the broker will check topic level privileges.
    ALTER_CONFIGS (33) AlterConfigs Cluster If broker configs are altered, then the broker will check cluster level privileges.
    ALTER_CONFIGS (33) AlterConfigs Topic If topic configs are altered, then the broker will check topic level privileges.
    ALTER_REPLICA_LOG_DIRS (34) Alter Cluster
    DESCRIBE_LOG_DIRS (35) Describe Cluster An empty response will be returned on authorization failure.
    SASL_AUTHENTICATE (36) SASL_AUTHENTICATE is part of the authentication process and therefore it's not possible to apply any kind of authorization here.
    CREATE_PARTITIONS (37) Alter Topic
    CREATE_DELEGATION_TOKEN (38) Creating delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section.
    CREATE_DELEGATION_TOKEN (38) CreateTokens User Allows creating delegation tokens for the User resource.
    RENEW_DELEGATION_TOKEN (39) Renewing delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section.
    EXPIRE_DELEGATION_TOKEN (40) Expiring delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section.
    DESCRIBE_DELEGATION_TOKEN (41) Describe DelegationToken Describing delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section.
    DESCRIBE_DELEGATION_TOKEN (41) DescribeTokens User Allows describing delegation tokens of the User resource.
    DELETE_GROUPS (42) Delete Group
    ELECT_PREFERRED_LEADERS (43) ClusterAction Cluster
    INCREMENTAL_ALTER_CONFIGS (44) AlterConfigs Cluster If broker configs are altered, then the broker will check cluster level privileges.
    INCREMENTAL_ALTER_CONFIGS (44) AlterConfigs Topic If topic configs are altered, then the broker will check topic level privileges.
    ALTER_PARTITION_REASSIGNMENTS (45) Alter Cluster
    LIST_PARTITION_REASSIGNMENTS (46) Describe Cluster
    OFFSET_DELETE (47) Delete Group
    OFFSET_DELETE (47) Read Topic