27/security.html [1068:2123]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - config file. We intend to make these configurable in a future Kafka release.

A token has a current life, and a maximum renewable life. By default, tokens must be renewed once every 24 hours for up to 7 days. These can be configured using delegation.token.expiry.time.ms and delegation.token.max.lifetime.ms config options.

Tokens can also be cancelled explicitly. If a token is not renewed by the token’s expiration time or if token is beyond the max life time, it will be deleted from all broker caches as well as from zookeeper.

  • Creating Delegation Tokens

    Tokens can be created by using Admin APIs or using kafka-delegation-tokens.sh script. Delegation token requests (create/renew/expire/describe) should be issued only on SASL or SSL authenticated channels. Tokens can not be requests if the initial authentication is done through delegation token. kafka-delegation-tokens.sh script examples are given below.

    Create a delegation token:

    > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create   --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1

    Renew a delegation token:

    > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew    --renew-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK

    Expire a delegation token:

    > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire   --expiry-time-period -1   --command-config client.properties  --hmac ABCDEFGHIJK

    Existing tokens can be described using the --describe option:

    > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe --command-config client.properties  --owner-principal User:user1
  • Token Authentication

    Delegation token authentication piggybacks on the current SASL/SCRAM authentication mechanism. We must enable SASL/SCRAM mechanism on Kafka cluster as described in here.

    Configuring Kafka Clients:

    1. Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the token authentication:
      sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
          username="tokenID123" \
          password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
          tokenauth="true";

      The options username and password are used by clients to configure the token id and token HMAC. And the option tokenauth is used to indicate the server about token authentication. In this example, clients connect to the broker using token id: tokenID123. Different clients within a JVM may connect using different tokens by specifying different token details in sasl.jaas.config.

      JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.

  • Procedure to manually rotate the secret:

    We require a re-deployment when the secret needs to be rotated. During this process, already connected clients will continue to work. But any new connection requests and renew/expire requests with old tokens can fail. Steps are given below.

    1. Expire all existing tokens.
    2. Rotate the secret by rolling upgrade, and
    3. Generate new tokens

    We intend to automate this in a future Kafka release.

  • Notes on Delegation Tokens
  • 7.4 Authorization and ACLs

    Kafka ships with a pluggable Authorizer and an out-of-box authorizer implementation that uses zookeeper to store all the acls. The Authorizer is configured by setting authorizer.class.name in server.properties. To enable the out of the box implementation use:
    authorizer.class.name=kafka.security.authorizer.AclAuthorizer
    Kafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP". You can read more about the acl structure in KIP-11 and resource patterns in KIP-290. In order to add, remove or list acls you can use the Kafka authorizer CLI. By default, if no ResourcePatterns match a specific Resource R, then R has no associated acls, and therefore no one other than super users is allowed to access R. If you want to change that behavior, you can include the following in server.properties.
    allow.everyone.if.no.acl.found=true
    One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string "User" is case sensitive.
    super.users=User:Bob;User:Alice
    Customizing SSL User Name
    By default, the SSL user name will be of the form "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can change that by setting ssl.principal.mapping.rules to a customized rule in server.properties. This config allows a list of rules for mapping X.500 distinguished name to short name. The rules are evaluated in order and the first rule that matches a distinguished name is used to map it to a short name. Any later rules in the list are ignored.
    The format of ssl.principal.mapping.rules is a list where each rule starts with "RULE:" and contains an expression as the following formats. Default rule will return string representation of the X.500 certificate distinguished name. If the distinguished name matches the pattern, then the replacement command will be run over the name. This also supports lowercase/uppercase options, to force the translated result to be all lower/uppercase case. This is done by adding a "/L" or "/U' to the end of the rule.
    RULE:pattern/replacement/
    RULE:pattern/replacement/[LU]
    Example ssl.principal.mapping.rules values are:
    RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
    RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
    RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
    DEFAULT
    Above rules translate distinguished name "CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "serviceuser" and "CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "adminuser@admin".
    For advanced use cases, one can customize the name by setting a customized PrincipalBuilder in server.properties like the following.
    principal.builder.class=CustomizedPrincipalBuilderClass
    Customizing SASL User Name
    By default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting sasl.kerberos.principal.to.local.rules to a customized rule in server.properties. The format of sasl.kerberos.principal.to.local.rules is a list where each rule works in the same way as the auth_to_local in Kerberos configuration file (krb5.conf). This also support additional lowercase/uppercase rule, to force the translated result to be all lowercase/uppercase. This is done by adding a "/L" or "/U" to the end of the rule. check below formats for syntax. Each rules starts with RULE: and contains an expression as the following formats. See the kerberos documentation for more details.
    RULE:[n:string](regexp)s/pattern/replacement/
    RULE:[n:string](regexp)s/pattern/replacement/g
    RULE:[n:string](regexp)s/pattern/replacement//L
    RULE:[n:string](regexp)s/pattern/replacement/g/L
    RULE:[n:string](regexp)s/pattern/replacement//U
    RULE:[n:string](regexp)s/pattern/replacement/g/U
    An example of adding a rule to properly translate user@MYDOMAIN.COM to user while also keeping the default rule in place is:
    sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT

    Command Line Interface

    Kafka Authorization management CLI can be found under bin directory with all the other CLIs. The CLI script is called kafka-acls.sh. Following lists all the options that the script supports:

    Option Description Default Option type
    --add Indicates to the script that user is trying to add an acl. Action
    --remove Indicates to the script that user is trying to remove an acl. Action
    --list Indicates to the script that user is trying to list acls. Action
    --authorizer Fully qualified class name of the authorizer. kafka.security.authorizer.AclAuthorizer Configuration
    --authorizer-properties key=val pairs that will be passed to authorizer for initialization. For the default authorizer the example values are: zookeeper.connect=localhost:2181 Configuration
    --bootstrap-server A list of host/port pairs to use for establishing the connection to the Kafka cluster. Only one of --bootstrap-server or --authorizer option must be specified. Configuration
    --command-config A property file containing configs to be passed to Admin Client. This option can only be used with --bootstrap-server option. Configuration
    --cluster Indicates to the script that the user is trying to interact with acls on the singular cluster resource. ResourcePattern
    --topic [topic-name] Indicates to the script that the user is trying to interact with acls on topic resource pattern(s). ResourcePattern
    --group [group-name] Indicates to the script that the user is trying to interact with acls on consumer-group resource pattern(s) ResourcePattern
    --transactional-id [transactional-id] The transactionalId to which ACLs should be added or removed. A value of * indicates the ACLs should apply to all transactionalIds. ResourcePattern
    --delegation-token [delegation-token] Delegation token to which ACLs should be added or removed. A value of * indicates ACL should apply to all tokens. ResourcePattern
    --resource-pattern-type [pattern-type] Indicates to the script the type of resource pattern, (for --add), or resource pattern filter, (for --list and --remove), the user wishes to use.
    When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'.
    When listing or removing acls, a specific pattern type filter can be used to list or remove acls from a specific type of resource pattern, or the filter values of 'any' or 'match' can be used, where 'any' will match any pattern type, but will match the resource name exactly, and 'match' will perform pattern matching to list or remove all acls that affect the supplied resource(s).
    WARNING: 'match', when used in combination with the '--remove' switch, should be used with care.
    literal Configuration
    --allow-principal Principal is in PrincipalType:name format that will be added to ACL with Allow permission. Default PrincipalType string "User" is case sensitive.
    You can specify multiple --allow-principal in a single command.
    Principal
    --deny-principal Principal is in PrincipalType:name format that will be added to ACL with Deny permission. Default PrincipalType string "User" is case sensitive.
    You can specify multiple --deny-principal in a single command.
    Principal
    --principal Principal is in PrincipalType:name format that will be used along with --list option. Default PrincipalType string "User" is case sensitive. This will list the ACLs for the specified principal.
    You can specify multiple --principal in a single command.
    Principal
    --allow-host IP address from which principals listed in --allow-principal will have access. if --allow-principal is specified defaults to * which translates to "all hosts" Host
    --deny-host IP address from which principals listed in --deny-principal will be denied access. if --deny-principal is specified defaults to * which translates to "all hosts" Host
    --operation Operation that will be allowed or denied.
    Valid values are:
    • Read
    • Write
    • Create
    • Delete
    • Alter
    • Describe
    • ClusterAction
    • DescribeConfigs
    • AlterConfigs
    • IdempotentWrite
    • All
    All Operation
    --producer Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE and CREATE on topic. Convenience
    --consumer Convenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group. Convenience
    --idempotent Enable idempotence for the producer. This should be used in combination with the --producer option.
    Note that idempotence is enabled automatically if the producer is authorized to a particular transactional-id.
    Convenience
    --force Convenience option to assume yes to all queries and do not prompt. Convenience
    --zk-tls-config-file Identifies the file where ZooKeeper client TLS connectivity properties for the authorizer are defined. Any properties other than the following (with or without an "authorizer." prefix) are ignored: zookeeper.clientCnxnSocket, zookeeper.ssl.cipher.suites, zookeeper.ssl.client.enable, zookeeper.ssl.crl.enable, zookeeper.ssl.enabled.protocols, zookeeper.ssl.endpoint.identification.algorithm, zookeeper.ssl.keystore.location, zookeeper.ssl.keystore.password, zookeeper.ssl.keystore.type, zookeeper.ssl.ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location, zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type Configuration

    Examples

    Authorization Primitives

    Protocol calls are usually performing some operations on certain resources in Kafka. It is required to know the operations and resources to set up effective protection. In this section we'll list these operations and resources, then list the combination of these with the protocols to see the valid scenarios.

    Operations in Kafka

    There are a few operation primitives that can be used to build up privileges. These can be matched up with certain resources to allow specific protocol calls for a given user. These are:

    Resources in Kafka

    The operations above can be applied on certain resources which are described below.

    Operations and Resources on Protocols

    In the below table we'll list the valid operations on resources that are executed by the Kafka API protocols.

    Protocol (API key) Operation Resource Note
    PRODUCE (0) Write TransactionalId An transactional producer which has its transactional.id set requires this privilege.
    PRODUCE (0) IdempotentWrite Cluster An idempotent produce action requires this privilege.
    PRODUCE (0) Write Topic This applies to a normal produce action.
    FETCH (1) ClusterAction Cluster A follower must have ClusterAction on the Cluster resource in order to fetch partition data.
    FETCH (1) Read Topic Regular Kafka consumers need READ permission on each partition they are fetching.
    LIST_OFFSETS (2) Describe Topic
    METADATA (3) Describe Topic
    METADATA (3) Create Cluster If topic auto-creation is enabled, then the broker-side API will check for the existence of a Cluster level privilege. If it's found then it'll allow creating the topic, otherwise it'll iterate through the Topic level privileges (see the next one).
    METADATA (3) Create Topic This authorizes auto topic creation if enabled but the given user doesn't have a cluster level permission (above).
    LEADER_AND_ISR (4) ClusterAction Cluster
    STOP_REPLICA (5) ClusterAction Cluster
    UPDATE_METADATA (6) ClusterAction Cluster
    CONTROLLED_SHUTDOWN (7) ClusterAction Cluster
    OFFSET_COMMIT (8) Read Group An offset can only be committed if it's authorized to the given group and the topic too (see below). Group access is checked first, then Topic access.
    OFFSET_COMMIT (8) Read Topic Since offset commit is part of the consuming process, it needs privileges for the read action.
    OFFSET_FETCH (9) Describe Group Similarly to OFFSET_COMMIT, the application must have privileges on group and topic level too to be able to fetch. However in this case it requires describe access instead of read. Group access is checked first, then Topic access.
    OFFSET_FETCH (9) Describe Topic
    FIND_COORDINATOR (10) Describe Group The FIND_COORDINATOR request can be of "Group" type in which case it is looking for consumergroup coordinators. This privilege would represent the Group mode.
    FIND_COORDINATOR (10) Describe TransactionalId This applies only on transactional producers and checked when a producer tries to find the transaction coordinator.
    JOIN_GROUP (11) Read Group
    HEARTBEAT (12) Read Group
    LEAVE_GROUP (13) Read Group
    SYNC_GROUP (14) Read Group
    DESCRIBE_GROUPS (15) Describe Group
    LIST_GROUPS (16) Describe Cluster When the broker checks to authorize a list_groups request it first checks for this cluster level authorization. If none found then it proceeds to check the groups individually. This operation doesn't return CLUSTER_AUTHORIZATION_FAILED.
    LIST_GROUPS (16) Describe Group If none of the groups are authorized, then just an empty response will be sent back instead of an error. This operation doesn't return CLUSTER_AUTHORIZATION_FAILED. This is applicable from the 2.1 release.
    SASL_HANDSHAKE (17) The SASL handshake is part of the authentication process and therefore it's not possible to apply any kind of authorization here.
    API_VERSIONS (18) The API_VERSIONS request is part of the Kafka protocol handshake and happens on connection and before any authentication. Therefore it's not possible to control this with authorization.
    CREATE_TOPICS (19) Create Cluster If there is no cluster level authorization then it won't return CLUSTER_AUTHORIZATION_FAILED but fall back to use topic level, which is just below. That'll throw error if there is a problem.
    CREATE_TOPICS (19) Create Topic This is applicable from the 2.0 release.
    DELETE_TOPICS (20) Delete Topic
    DELETE_RECORDS (21) Delete Topic
    INIT_PRODUCER_ID (22) Write TransactionalId
    INIT_PRODUCER_ID (22) IdempotentWrite Cluster
    OFFSET_FOR_LEADER_EPOCH (23) ClusterAction Cluster If there is no cluster level privilege for this operation, then it'll check for topic level one.
    OFFSET_FOR_LEADER_EPOCH (23) Describe Topic This is applicable from the 2.1 release.
    ADD_PARTITIONS_TO_TXN (24) Write TransactionalId This API is only applicable to transactional requests. It first checks for the Write action on the TransactionalId resource, then it checks the Topic in subject (below).
    ADD_PARTITIONS_TO_TXN (24) Write Topic
    ADD_OFFSETS_TO_TXN (25) Write TransactionalId Similarly to ADD_PARTITIONS_TO_TXN this is only applicable to transactional request. It first checks for Write action on the TransactionalId resource, then it checks whether it can Read on the given group (below).
    ADD_OFFSETS_TO_TXN (25) Read Group
    END_TXN (26) Write TransactionalId
    WRITE_TXN_MARKERS (27) ClusterAction Cluster
    TXN_OFFSET_COMMIT (28) Write TransactionalId
    TXN_OFFSET_COMMIT (28) Read Group
    TXN_OFFSET_COMMIT (28) Read Topic
    DESCRIBE_ACLS (29) Describe Cluster
    CREATE_ACLS (30) Alter Cluster
    DELETE_ACLS (31) Alter Cluster
    DESCRIBE_CONFIGS (32) DescribeConfigs Cluster If broker configs are requested, then the broker will check cluster level privileges.
    DESCRIBE_CONFIGS (32) DescribeConfigs Topic If topic configs are requested, then the broker will check topic level privileges.
    ALTER_CONFIGS (33) AlterConfigs Cluster If broker configs are altered, then the broker will check cluster level privileges.
    ALTER_CONFIGS (33) AlterConfigs Topic If topic configs are altered, then the broker will check topic level privileges.
    ALTER_REPLICA_LOG_DIRS (34) Alter Cluster
    DESCRIBE_LOG_DIRS (35) Describe Cluster An empty response will be returned on authorization failure.
    SASL_AUTHENTICATE (36) SASL_AUTHENTICATE is part of the authentication process and therefore it's not possible to apply any kind of authorization here.
    CREATE_PARTITIONS (37) Alter Topic
    CREATE_DELEGATION_TOKEN (38) Creating delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section.
    RENEW_DELEGATION_TOKEN (39) Renewing delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section.
    EXPIRE_DELEGATION_TOKEN (40) Expiring delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section.
    DESCRIBE_DELEGATION_TOKEN (41) Describe DelegationToken Describing delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section.
    DELETE_GROUPS (42) Delete Group
    ELECT_PREFERRED_LEADERS (43) ClusterAction Cluster
    INCREMENTAL_ALTER_CONFIGS (44) AlterConfigs Cluster If broker configs are altered, then the broker will check cluster level privileges.
    INCREMENTAL_ALTER_CONFIGS (44) AlterConfigs Topic If topic configs are altered, then the broker will check topic level privileges.
    ALTER_PARTITION_REASSIGNMENTS (45) Alter Cluster
    LIST_PARTITION_REASSIGNMENTS (46) Describe Cluster
    OFFSET_DELETE (47) Delete Group
    OFFSET_DELETE (47) Read Topic

    7.5 Incorporating Security Features in a Running Cluster

    You can secure a running cluster via one or more of the supported protocols discussed previously. This is done in phases:

    The specific steps for configuring SSL and SASL are described in sections 7.2 and 7.3. Follow these steps to enable security for your desired protocol(s).

    The security implementation lets you configure different protocols for both broker-client and broker-broker communication. These must be enabled in separate bounces. A PLAINTEXT port must be left open throughout so brokers and/or clients can continue to communicate.

    When performing an incremental bounce stop the brokers cleanly via a SIGTERM. It's also good practice to wait for restarted replicas to return to the ISR list before moving onto the next node.

    As an example, say we wish to encrypt both broker-client and broker-broker communication with SSL. In the first incremental bounce, an SSL port is opened on each node:
    listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
    We then restart the clients, changing their config to point at the newly opened, secured port:
    bootstrap.servers = [broker1:9092,...]
    security.protocol = SSL
    ...etc
    In the second incremental server bounce we instruct Kafka to use SSL as the broker-broker protocol (which will use the same SSL port):
    listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
    security.inter.broker.protocol=SSL
    In the final bounce we secure the cluster by closing the PLAINTEXT port:
    listeners=SSL://broker1:9092
    security.inter.broker.protocol=SSL
    Alternatively we might choose to open multiple ports so that different protocols can be used for broker-broker and broker-client communication. Say we wished to use SSL encryption throughout (i.e. for broker-broker and broker-client communication) but we'd like to add SASL authentication to the broker-client connection also. We would achieve this by opening two additional ports during the first bounce:
    listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
    We would then restart the clients, changing their config to point at the newly opened, SASL & SSL secured port:
    bootstrap.servers = [broker1:9093,...]
    security.protocol = SASL_SSL
    ...etc
    The second server bounce would switch the cluster to use encrypted broker-broker communication via the SSL port we previously opened on port 9092:
    listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
    security.inter.broker.protocol=SSL
    The final bounce secures the cluster by closing the PLAINTEXT port.
    listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
    security.inter.broker.protocol=SSL
    ZooKeeper can be secured independently of the Kafka cluster. The steps for doing this are covered in section 7.6.2.

    7.6 ZooKeeper Authentication

    ZooKeeper supports mutual TLS (mTLS) authentication beginning with the 3.5.x versions. Kafka supports authenticating to ZooKeeper with SASL and mTLS -- either individually or both together -- beginning with version 2.5. See KIP-515: Enable ZK client to use the new TLS supported authentication for more details.

    When using mTLS alone, every broker and any CLI tools (such as the ZooKeeper Security Migration Tool) should identify itself with the same Distinguished Name (DN) because it is the DN that is ACL'ed. This can be changed as described below, but it involves writing and deploying a custom ZooKeeper authentication provider. Generally each certificate should have the same DN but a different Subject Alternative Name (SAN) so that hostname verification of the brokers and any CLI tools by ZooKeeper will succeed.

    When using SASL authentication to ZooKeeper together with mTLS, both the SASL identity and either the DN that created the znode (i.e. the creating broker's certificate) or the DN of the Security Migration Tool (if migration was performed after the znode was created) will be ACL'ed, and all brokers and CLI tools will be authorized even if they all use different DNs because they will all use the same ACL'ed SASL identity. It is only when using mTLS authentication alone that all the DNs must match (and SANs become critical -- again, in the absence of writing and deploying a custom ZooKeeper authentication provider as described below).

    Use the broker properties file to set TLS configs for brokers as described below.

    Use the --zk-tls-config-file <file> option to set TLS configs in the Zookeeper Security Migration Tool. The kafka-acls.sh and kafka-configs.sh CLI tools also support the --zk-tls-config-file <file> option.

    Use the -zk-tls-config-file <file> option (note the single-dash rather than double-dash) to set TLS configs for the zookeeper-shell.sh CLI tool.

    7.6.1 New clusters

    7.6.1.1 ZooKeeper SASL Authentication
    To enable ZooKeeper SASL authentication on brokers, there are two necessary steps:
    1. Create a JAAS login file and set the appropriate system property to point to it as described above
    2. Set the configuration property zookeeper.set.acl in each broker to true
    The metadata stored in ZooKeeper for the Kafka cluster is world-readable, but can only be modified by the brokers. The rationale behind this decision is that the data stored in ZooKeeper is not sensitive, but inappropriate manipulation of that data can cause cluster disruption. We also recommend limiting the access to ZooKeeper via network segmentation (only brokers and some admin tools need access to ZooKeeper).
    7.6.1.2 ZooKeeper Mutual TLS Authentication
    ZooKeeper mTLS authentication can be enabled with or without SASL authentication. As mentioned above, when using mTLS alone, every broker and any CLI tools (such as the ZooKeeper Security Migration Tool) must generally identify itself with the same Distinguished Name (DN) because it is the DN that is ACL'ed, which means each certificate should have an appropriate Subject Alternative Name (SAN) so that hostname verification of the brokers and any CLI tool by ZooKeeper will succeed.

    It is possible to use something other than the DN for the identity of mTLS clients by writing a class that extends org.apache.zookeeper.server.auth.X509AuthenticationProvider and overrides the method protected String getClientId(X509Certificate clientCert). Choose a scheme name and set authProvider.[scheme] in ZooKeeper to be the fully-qualified class name of the custom implementation; then set ssl.authProvider=[scheme] to use it.

    Here is a sample (partial) ZooKeeper configuration for enabling TLS authentication. These configurations are described in the ZooKeeper Admin Guide.
    secureClientPort=2182
    serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
    authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
    ssl.keyStore.location=/path/to/zk/keystore.jks
    ssl.keyStore.password=zk-ks-passwd
    ssl.trustStore.location=/path/to/zk/truststore.jks
    ssl.trustStore.password=zk-ts-passwd
    IMPORTANT: ZooKeeper does not support setting the key password in the ZooKeeper server keystore to a value different from the keystore password itself. Be sure to set the key password to be the same as the keystore password.

    Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with mTLS authentication. These configurations are described above in Broker Configs.

    # connect to the ZooKeeper port configured for TLS
    zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
    # required to use TLS to ZooKeeper (default is false)
    zookeeper.ssl.client.enable=true
    # required to use TLS to ZooKeeper
    zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
    # define key/trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
    zookeeper.ssl.keystore.location=/path/to/kafka/keystore.jks
    zookeeper.ssl.keystore.password=kafka-ks-passwd
    zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
    zookeeper.ssl.truststore.password=kafka-ts-passwd
    # tell broker to create ACLs on znodes
    zookeeper.set.acl=true
    IMPORTANT: ZooKeeper does not support setting the key password in the ZooKeeper client (i.e. broker) keystore to a value different from the keystore password itself. Be sure to set the key password to be the same as the keystore password.

    7.6.2 Migrating clusters

    If you are running a version of Kafka that does not support security or simply with security disabled, and you want to make the cluster secure, then you need to execute the following steps to enable ZooKeeper authentication with minimal disruption to your operations:
    1. Enable SASL and/or mTLS authentication on ZooKeeper. If enabling mTLS, you would now have both a non-TLS port and a TLS port, like this:
      clientPort=2181
      secureClientPort=2182
      serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
      authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
      ssl.keyStore.location=/path/to/zk/keystore.jks
      ssl.keyStore.password=zk-ks-passwd
      ssl.trustStore.location=/path/to/zk/truststore.jks
      ssl.trustStore.password=zk-ts-passwd
    2. Perform a rolling restart of brokers setting the JAAS login file and/or defining ZooKeeper mutual TLS configurations (including connecting to the TLS-enabled ZooKeeper port) as required, which enables brokers to authenticate to ZooKeeper. At the end of the rolling restart, brokers are able to manipulate znodes with strict ACLs, but they will not create znodes with those ACLs
    3. If you enabled mTLS, disable the non-TLS port in ZooKeeper
    4. Perform a second rolling restart of brokers, this time setting the configuration parameter zookeeper.set.acl to true, which enables the use of secure ACLs when creating znodes
    5. Execute the ZkSecurityMigrator tool. To execute the tool, there is this script: bin/zookeeper-security-migration.sh with zookeeper.acl set to secure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes. Use the --zk-tls-config-file <file> option if you enable mTLS.

    It is also possible to turn off authentication in a secure cluster. To do it, follow these steps:

    1. Perform a rolling restart of brokers setting the JAAS login file and/or defining ZooKeeper mutual TLS configurations, which enables brokers to authenticate, but setting zookeeper.set.acl to false. At the end of the rolling restart, brokers stop creating znodes with secure ACLs, but are still able to authenticate and manipulate all znodes
    2. Execute the ZkSecurityMigrator tool. To execute the tool, run this script bin/zookeeper-security-migration.sh with zookeeper.acl set to unsecure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes. Use the --zk-tls-config-file <file> option if you need to set TLS configuration.
    3. If you are disabling mTLS, enable the non-TLS port in ZooKeeper
    4. Perform a second rolling restart of brokers, this time omitting the system property that sets the JAAS login file and/or removing ZooKeeper mutual TLS configuration (including connecting to the non-TLS-enabled ZooKeeper port) as required
    5. If you are disabling mTLS, disable the TLS port in ZooKeeper
    Here is an example of how to run the migration tool:
    bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181

    Run this to see the full list of parameters:

    bin/zookeeper-security-migration.sh --help

    7.6.3 Migrating the ZooKeeper ensemble

    It is also necessary to enable SASL and/or mTLS authentication on the ZooKeeper ensemble. To do it, we need to perform a rolling restart of the server and set a few properties. See above for mTLS information. Please refer to the ZooKeeper documentation for more detail:
    1. Apache ZooKeeper documentation
    2. Apache ZooKeeper wiki

    7.6.4 ZooKeeper Quorum Mutual TLS Authentication

    It is possible to enable mTLS authentication between the ZooKeeper servers themselves. Please refer to the ZooKeeper documentation for more detail.

    7.7 ZooKeeper Encryption

    ZooKeeper connections that use mutual TLS are encrypted. Beginning with ZooKeeper version 3.5.7 (the version shipped with Kafka version 2.5) ZooKeeper supports a sever-side config ssl.clientAuth (case-insensitively: want/need/none are the valid options, the default is need), and setting this value to none in ZooKeeper allows clients to connect via a TLS-encrypted connection without presenting their own certificate. Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with just TLS encryption. These configurations are described above in Broker Configs.
    # connect to the ZooKeeper port configured for TLS
    zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
    # required to use TLS to ZooKeeper (default is false)
    zookeeper.ssl.client.enable=true
    # required to use TLS to ZooKeeper
    zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
    # define trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
    # no need to set keystore information assuming ssl.clientAuth=none on ZooKeeper
    zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
    zookeeper.ssl.truststore.password=kafka-ts-passwd
    # tell broker to create ACLs on znodes (if using SASL authentication, otherwise do not set this)
    zookeeper.set.acl=true
    - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 28/security.html [1068:2123]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - config file. We intend to make these configurable in a future Kafka release.

    A token has a current life, and a maximum renewable life. By default, tokens must be renewed once every 24 hours for up to 7 days. These can be configured using delegation.token.expiry.time.ms and delegation.token.max.lifetime.ms config options.

    Tokens can also be cancelled explicitly. If a token is not renewed by the token’s expiration time or if token is beyond the max life time, it will be deleted from all broker caches as well as from zookeeper.

  • Creating Delegation Tokens

    Tokens can be created by using Admin APIs or using kafka-delegation-tokens.sh script. Delegation token requests (create/renew/expire/describe) should be issued only on SASL or SSL authenticated channels. Tokens can not be requests if the initial authentication is done through delegation token. kafka-delegation-tokens.sh script examples are given below.

    Create a delegation token:

    > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create   --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1

    Renew a delegation token:

    > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew    --renew-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK

    Expire a delegation token:

    > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire   --expiry-time-period -1   --command-config client.properties  --hmac ABCDEFGHIJK

    Existing tokens can be described using the --describe option:

    > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe --command-config client.properties  --owner-principal User:user1
  • Token Authentication

    Delegation token authentication piggybacks on the current SASL/SCRAM authentication mechanism. We must enable SASL/SCRAM mechanism on Kafka cluster as described in here.

    Configuring Kafka Clients:

    1. Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the token authentication:
      sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
          username="tokenID123" \
          password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
          tokenauth="true";

      The options username and password are used by clients to configure the token id and token HMAC. And the option tokenauth is used to indicate the server about token authentication. In this example, clients connect to the broker using token id: tokenID123. Different clients within a JVM may connect using different tokens by specifying different token details in sasl.jaas.config.

      JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.

  • Procedure to manually rotate the secret:

    We require a re-deployment when the secret needs to be rotated. During this process, already connected clients will continue to work. But any new connection requests and renew/expire requests with old tokens can fail. Steps are given below.

    1. Expire all existing tokens.
    2. Rotate the secret by rolling upgrade, and
    3. Generate new tokens

    We intend to automate this in a future Kafka release.

  • Notes on Delegation Tokens
  • 7.4 Authorization and ACLs

    Kafka ships with a pluggable Authorizer and an out-of-box authorizer implementation that uses zookeeper to store all the acls. The Authorizer is configured by setting authorizer.class.name in server.properties. To enable the out of the box implementation use:
    authorizer.class.name=kafka.security.authorizer.AclAuthorizer
    Kafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP". You can read more about the acl structure in KIP-11 and resource patterns in KIP-290. In order to add, remove or list acls you can use the Kafka authorizer CLI. By default, if no ResourcePatterns match a specific Resource R, then R has no associated acls, and therefore no one other than super users is allowed to access R. If you want to change that behavior, you can include the following in server.properties.
    allow.everyone.if.no.acl.found=true
    One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string "User" is case sensitive.
    super.users=User:Bob;User:Alice
    Customizing SSL User Name
    By default, the SSL user name will be of the form "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can change that by setting ssl.principal.mapping.rules to a customized rule in server.properties. This config allows a list of rules for mapping X.500 distinguished name to short name. The rules are evaluated in order and the first rule that matches a distinguished name is used to map it to a short name. Any later rules in the list are ignored.
    The format of ssl.principal.mapping.rules is a list where each rule starts with "RULE:" and contains an expression as the following formats. Default rule will return string representation of the X.500 certificate distinguished name. If the distinguished name matches the pattern, then the replacement command will be run over the name. This also supports lowercase/uppercase options, to force the translated result to be all lower/uppercase case. This is done by adding a "/L" or "/U' to the end of the rule.
    RULE:pattern/replacement/
    RULE:pattern/replacement/[LU]
    Example ssl.principal.mapping.rules values are:
    RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
    RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
    RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
    DEFAULT
    Above rules translate distinguished name "CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "serviceuser" and "CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "adminuser@admin".
    For advanced use cases, one can customize the name by setting a customized PrincipalBuilder in server.properties like the following.
    principal.builder.class=CustomizedPrincipalBuilderClass
    Customizing SASL User Name
    By default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting sasl.kerberos.principal.to.local.rules to a customized rule in server.properties. The format of sasl.kerberos.principal.to.local.rules is a list where each rule works in the same way as the auth_to_local in Kerberos configuration file (krb5.conf). This also support additional lowercase/uppercase rule, to force the translated result to be all lowercase/uppercase. This is done by adding a "/L" or "/U" to the end of the rule. check below formats for syntax. Each rules starts with RULE: and contains an expression as the following formats. See the kerberos documentation for more details.
    RULE:[n:string](regexp)s/pattern/replacement/
    RULE:[n:string](regexp)s/pattern/replacement/g
    RULE:[n:string](regexp)s/pattern/replacement//L
    RULE:[n:string](regexp)s/pattern/replacement/g/L
    RULE:[n:string](regexp)s/pattern/replacement//U
    RULE:[n:string](regexp)s/pattern/replacement/g/U
    An example of adding a rule to properly translate user@MYDOMAIN.COM to user while also keeping the default rule in place is:
    sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT

    Command Line Interface

    Kafka Authorization management CLI can be found under bin directory with all the other CLIs. The CLI script is called kafka-acls.sh. Following lists all the options that the script supports:

    Option Description Default Option type
    --add Indicates to the script that user is trying to add an acl. Action
    --remove Indicates to the script that user is trying to remove an acl. Action
    --list Indicates to the script that user is trying to list acls. Action
    --authorizer Fully qualified class name of the authorizer. kafka.security.authorizer.AclAuthorizer Configuration
    --authorizer-properties key=val pairs that will be passed to authorizer for initialization. For the default authorizer the example values are: zookeeper.connect=localhost:2181 Configuration
    --bootstrap-server A list of host/port pairs to use for establishing the connection to the Kafka cluster. Only one of --bootstrap-server or --authorizer option must be specified. Configuration
    --command-config A property file containing configs to be passed to Admin Client. This option can only be used with --bootstrap-server option. Configuration
    --cluster Indicates to the script that the user is trying to interact with acls on the singular cluster resource. ResourcePattern
    --topic [topic-name] Indicates to the script that the user is trying to interact with acls on topic resource pattern(s). ResourcePattern
    --group [group-name] Indicates to the script that the user is trying to interact with acls on consumer-group resource pattern(s) ResourcePattern
    --transactional-id [transactional-id] The transactionalId to which ACLs should be added or removed. A value of * indicates the ACLs should apply to all transactionalIds. ResourcePattern
    --delegation-token [delegation-token] Delegation token to which ACLs should be added or removed. A value of * indicates ACL should apply to all tokens. ResourcePattern
    --resource-pattern-type [pattern-type] Indicates to the script the type of resource pattern, (for --add), or resource pattern filter, (for --list and --remove), the user wishes to use.
    When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'.
    When listing or removing acls, a specific pattern type filter can be used to list or remove acls from a specific type of resource pattern, or the filter values of 'any' or 'match' can be used, where 'any' will match any pattern type, but will match the resource name exactly, and 'match' will perform pattern matching to list or remove all acls that affect the supplied resource(s).
    WARNING: 'match', when used in combination with the '--remove' switch, should be used with care.
    literal Configuration
    --allow-principal Principal is in PrincipalType:name format that will be added to ACL with Allow permission. Default PrincipalType string "User" is case sensitive.
    You can specify multiple --allow-principal in a single command.
    Principal
    --deny-principal Principal is in PrincipalType:name format that will be added to ACL with Deny permission. Default PrincipalType string "User" is case sensitive.
    You can specify multiple --deny-principal in a single command.
    Principal
    --principal Principal is in PrincipalType:name format that will be used along with --list option. Default PrincipalType string "User" is case sensitive. This will list the ACLs for the specified principal.
    You can specify multiple --principal in a single command.
    Principal
    --allow-host IP address from which principals listed in --allow-principal will have access. if --allow-principal is specified defaults to * which translates to "all hosts" Host
    --deny-host IP address from which principals listed in --deny-principal will be denied access. if --deny-principal is specified defaults to * which translates to "all hosts" Host
    --operation Operation that will be allowed or denied.
    Valid values are:
    • Read
    • Write
    • Create
    • Delete
    • Alter
    • Describe
    • ClusterAction
    • DescribeConfigs
    • AlterConfigs
    • IdempotentWrite
    • All
    All Operation
    --producer Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE and CREATE on topic. Convenience
    --consumer Convenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group. Convenience
    --idempotent Enable idempotence for the producer. This should be used in combination with the --producer option.
    Note that idempotence is enabled automatically if the producer is authorized to a particular transactional-id.
    Convenience
    --force Convenience option to assume yes to all queries and do not prompt. Convenience
    --zk-tls-config-file Identifies the file where ZooKeeper client TLS connectivity properties for the authorizer are defined. Any properties other than the following (with or without an "authorizer." prefix) are ignored: zookeeper.clientCnxnSocket, zookeeper.ssl.cipher.suites, zookeeper.ssl.client.enable, zookeeper.ssl.crl.enable, zookeeper.ssl.enabled.protocols, zookeeper.ssl.endpoint.identification.algorithm, zookeeper.ssl.keystore.location, zookeeper.ssl.keystore.password, zookeeper.ssl.keystore.type, zookeeper.ssl.ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location, zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type Configuration

    Examples

    Authorization Primitives

    Protocol calls are usually performing some operations on certain resources in Kafka. It is required to know the operations and resources to set up effective protection. In this section we'll list these operations and resources, then list the combination of these with the protocols to see the valid scenarios.

    Operations in Kafka

    There are a few operation primitives that can be used to build up privileges. These can be matched up with certain resources to allow specific protocol calls for a given user. These are:

    Resources in Kafka

    The operations above can be applied on certain resources which are described below.

    Operations and Resources on Protocols

    In the below table we'll list the valid operations on resources that are executed by the Kafka API protocols.

    Protocol (API key) Operation Resource Note
    PRODUCE (0) Write TransactionalId An transactional producer which has its transactional.id set requires this privilege.
    PRODUCE (0) IdempotentWrite Cluster An idempotent produce action requires this privilege.
    PRODUCE (0) Write Topic This applies to a normal produce action.
    FETCH (1) ClusterAction Cluster A follower must have ClusterAction on the Cluster resource in order to fetch partition data.
    FETCH (1) Read Topic Regular Kafka consumers need READ permission on each partition they are fetching.
    LIST_OFFSETS (2) Describe Topic
    METADATA (3) Describe Topic
    METADATA (3) Create Cluster If topic auto-creation is enabled, then the broker-side API will check for the existence of a Cluster level privilege. If it's found then it'll allow creating the topic, otherwise it'll iterate through the Topic level privileges (see the next one).
    METADATA (3) Create Topic This authorizes auto topic creation if enabled but the given user doesn't have a cluster level permission (above).
    LEADER_AND_ISR (4) ClusterAction Cluster
    STOP_REPLICA (5) ClusterAction Cluster
    UPDATE_METADATA (6) ClusterAction Cluster
    CONTROLLED_SHUTDOWN (7) ClusterAction Cluster
    OFFSET_COMMIT (8) Read Group An offset can only be committed if it's authorized to the given group and the topic too (see below). Group access is checked first, then Topic access.
    OFFSET_COMMIT (8) Read Topic Since offset commit is part of the consuming process, it needs privileges for the read action.
    OFFSET_FETCH (9) Describe Group Similarly to OFFSET_COMMIT, the application must have privileges on group and topic level too to be able to fetch. However in this case it requires describe access instead of read. Group access is checked first, then Topic access.
    OFFSET_FETCH (9) Describe Topic
    FIND_COORDINATOR (10) Describe Group The FIND_COORDINATOR request can be of "Group" type in which case it is looking for consumergroup coordinators. This privilege would represent the Group mode.
    FIND_COORDINATOR (10) Describe TransactionalId This applies only on transactional producers and checked when a producer tries to find the transaction coordinator.
    JOIN_GROUP (11) Read Group
    HEARTBEAT (12) Read Group
    LEAVE_GROUP (13) Read Group
    SYNC_GROUP (14) Read Group
    DESCRIBE_GROUPS (15) Describe Group
    LIST_GROUPS (16) Describe Cluster When the broker checks to authorize a list_groups request it first checks for this cluster level authorization. If none found then it proceeds to check the groups individually. This operation doesn't return CLUSTER_AUTHORIZATION_FAILED.
    LIST_GROUPS (16) Describe Group If none of the groups are authorized, then just an empty response will be sent back instead of an error. This operation doesn't return CLUSTER_AUTHORIZATION_FAILED. This is applicable from the 2.1 release.
    SASL_HANDSHAKE (17) The SASL handshake is part of the authentication process and therefore it's not possible to apply any kind of authorization here.
    API_VERSIONS (18) The API_VERSIONS request is part of the Kafka protocol handshake and happens on connection and before any authentication. Therefore it's not possible to control this with authorization.
    CREATE_TOPICS (19) Create Cluster If there is no cluster level authorization then it won't return CLUSTER_AUTHORIZATION_FAILED but fall back to use topic level, which is just below. That'll throw error if there is a problem.
    CREATE_TOPICS (19) Create Topic This is applicable from the 2.0 release.
    DELETE_TOPICS (20) Delete Topic
    DELETE_RECORDS (21) Delete Topic
    INIT_PRODUCER_ID (22) Write TransactionalId
    INIT_PRODUCER_ID (22) IdempotentWrite Cluster
    OFFSET_FOR_LEADER_EPOCH (23) ClusterAction Cluster If there is no cluster level privilege for this operation, then it'll check for topic level one.
    OFFSET_FOR_LEADER_EPOCH (23) Describe Topic This is applicable from the 2.1 release.
    ADD_PARTITIONS_TO_TXN (24) Write TransactionalId This API is only applicable to transactional requests. It first checks for the Write action on the TransactionalId resource, then it checks the Topic in subject (below).
    ADD_PARTITIONS_TO_TXN (24) Write Topic
    ADD_OFFSETS_TO_TXN (25) Write TransactionalId Similarly to ADD_PARTITIONS_TO_TXN this is only applicable to transactional request. It first checks for Write action on the TransactionalId resource, then it checks whether it can Read on the given group (below).
    ADD_OFFSETS_TO_TXN (25) Read Group
    END_TXN (26) Write TransactionalId
    WRITE_TXN_MARKERS (27) ClusterAction Cluster
    TXN_OFFSET_COMMIT (28) Write TransactionalId
    TXN_OFFSET_COMMIT (28) Read Group
    TXN_OFFSET_COMMIT (28) Read Topic
    DESCRIBE_ACLS (29) Describe Cluster
    CREATE_ACLS (30) Alter Cluster
    DELETE_ACLS (31) Alter Cluster
    DESCRIBE_CONFIGS (32) DescribeConfigs Cluster If broker configs are requested, then the broker will check cluster level privileges.
    DESCRIBE_CONFIGS (32) DescribeConfigs Topic If topic configs are requested, then the broker will check topic level privileges.
    ALTER_CONFIGS (33) AlterConfigs Cluster If broker configs are altered, then the broker will check cluster level privileges.
    ALTER_CONFIGS (33) AlterConfigs Topic If topic configs are altered, then the broker will check topic level privileges.
    ALTER_REPLICA_LOG_DIRS (34) Alter Cluster
    DESCRIBE_LOG_DIRS (35) Describe Cluster An empty response will be returned on authorization failure.
    SASL_AUTHENTICATE (36) SASL_AUTHENTICATE is part of the authentication process and therefore it's not possible to apply any kind of authorization here.
    CREATE_PARTITIONS (37) Alter Topic
    CREATE_DELEGATION_TOKEN (38) Creating delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section.
    RENEW_DELEGATION_TOKEN (39) Renewing delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section.
    EXPIRE_DELEGATION_TOKEN (40) Expiring delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section.
    DESCRIBE_DELEGATION_TOKEN (41) Describe DelegationToken Describing delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section.
    DELETE_GROUPS (42) Delete Group
    ELECT_PREFERRED_LEADERS (43) ClusterAction Cluster
    INCREMENTAL_ALTER_CONFIGS (44) AlterConfigs Cluster If broker configs are altered, then the broker will check cluster level privileges.
    INCREMENTAL_ALTER_CONFIGS (44) AlterConfigs Topic If topic configs are altered, then the broker will check topic level privileges.
    ALTER_PARTITION_REASSIGNMENTS (45) Alter Cluster
    LIST_PARTITION_REASSIGNMENTS (46) Describe Cluster
    OFFSET_DELETE (47) Delete Group
    OFFSET_DELETE (47) Read Topic

    7.5 Incorporating Security Features in a Running Cluster

    You can secure a running cluster via one or more of the supported protocols discussed previously. This is done in phases:

    The specific steps for configuring SSL and SASL are described in sections 7.2 and 7.3. Follow these steps to enable security for your desired protocol(s).

    The security implementation lets you configure different protocols for both broker-client and broker-broker communication. These must be enabled in separate bounces. A PLAINTEXT port must be left open throughout so brokers and/or clients can continue to communicate.

    When performing an incremental bounce stop the brokers cleanly via a SIGTERM. It's also good practice to wait for restarted replicas to return to the ISR list before moving onto the next node.

    As an example, say we wish to encrypt both broker-client and broker-broker communication with SSL. In the first incremental bounce, an SSL port is opened on each node:
    listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
    We then restart the clients, changing their config to point at the newly opened, secured port:
    bootstrap.servers = [broker1:9092,...]
    security.protocol = SSL
    ...etc
    In the second incremental server bounce we instruct Kafka to use SSL as the broker-broker protocol (which will use the same SSL port):
    listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
    security.inter.broker.protocol=SSL
    In the final bounce we secure the cluster by closing the PLAINTEXT port:
    listeners=SSL://broker1:9092
    security.inter.broker.protocol=SSL
    Alternatively we might choose to open multiple ports so that different protocols can be used for broker-broker and broker-client communication. Say we wished to use SSL encryption throughout (i.e. for broker-broker and broker-client communication) but we'd like to add SASL authentication to the broker-client connection also. We would achieve this by opening two additional ports during the first bounce:
    listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
    We would then restart the clients, changing their config to point at the newly opened, SASL & SSL secured port:
    bootstrap.servers = [broker1:9093,...]
    security.protocol = SASL_SSL
    ...etc
    The second server bounce would switch the cluster to use encrypted broker-broker communication via the SSL port we previously opened on port 9092:
    listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
    security.inter.broker.protocol=SSL
    The final bounce secures the cluster by closing the PLAINTEXT port.
    listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
    security.inter.broker.protocol=SSL
    ZooKeeper can be secured independently of the Kafka cluster. The steps for doing this are covered in section 7.6.2.

    7.6 ZooKeeper Authentication

    ZooKeeper supports mutual TLS (mTLS) authentication beginning with the 3.5.x versions. Kafka supports authenticating to ZooKeeper with SASL and mTLS -- either individually or both together -- beginning with version 2.5. See KIP-515: Enable ZK client to use the new TLS supported authentication for more details.

    When using mTLS alone, every broker and any CLI tools (such as the ZooKeeper Security Migration Tool) should identify itself with the same Distinguished Name (DN) because it is the DN that is ACL'ed. This can be changed as described below, but it involves writing and deploying a custom ZooKeeper authentication provider. Generally each certificate should have the same DN but a different Subject Alternative Name (SAN) so that hostname verification of the brokers and any CLI tools by ZooKeeper will succeed.

    When using SASL authentication to ZooKeeper together with mTLS, both the SASL identity and either the DN that created the znode (i.e. the creating broker's certificate) or the DN of the Security Migration Tool (if migration was performed after the znode was created) will be ACL'ed, and all brokers and CLI tools will be authorized even if they all use different DNs because they will all use the same ACL'ed SASL identity. It is only when using mTLS authentication alone that all the DNs must match (and SANs become critical -- again, in the absence of writing and deploying a custom ZooKeeper authentication provider as described below).

    Use the broker properties file to set TLS configs for brokers as described below.

    Use the --zk-tls-config-file <file> option to set TLS configs in the Zookeeper Security Migration Tool. The kafka-acls.sh and kafka-configs.sh CLI tools also support the --zk-tls-config-file <file> option.

    Use the -zk-tls-config-file <file> option (note the single-dash rather than double-dash) to set TLS configs for the zookeeper-shell.sh CLI tool.

    7.6.1 New clusters

    7.6.1.1 ZooKeeper SASL Authentication
    To enable ZooKeeper SASL authentication on brokers, there are two necessary steps:
    1. Create a JAAS login file and set the appropriate system property to point to it as described above
    2. Set the configuration property zookeeper.set.acl in each broker to true
    The metadata stored in ZooKeeper for the Kafka cluster is world-readable, but can only be modified by the brokers. The rationale behind this decision is that the data stored in ZooKeeper is not sensitive, but inappropriate manipulation of that data can cause cluster disruption. We also recommend limiting the access to ZooKeeper via network segmentation (only brokers and some admin tools need access to ZooKeeper).
    7.6.1.2 ZooKeeper Mutual TLS Authentication
    ZooKeeper mTLS authentication can be enabled with or without SASL authentication. As mentioned above, when using mTLS alone, every broker and any CLI tools (such as the ZooKeeper Security Migration Tool) must generally identify itself with the same Distinguished Name (DN) because it is the DN that is ACL'ed, which means each certificate should have an appropriate Subject Alternative Name (SAN) so that hostname verification of the brokers and any CLI tool by ZooKeeper will succeed.

    It is possible to use something other than the DN for the identity of mTLS clients by writing a class that extends org.apache.zookeeper.server.auth.X509AuthenticationProvider and overrides the method protected String getClientId(X509Certificate clientCert). Choose a scheme name and set authProvider.[scheme] in ZooKeeper to be the fully-qualified class name of the custom implementation; then set ssl.authProvider=[scheme] to use it.

    Here is a sample (partial) ZooKeeper configuration for enabling TLS authentication. These configurations are described in the ZooKeeper Admin Guide.
    secureClientPort=2182
    serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
    authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
    ssl.keyStore.location=/path/to/zk/keystore.jks
    ssl.keyStore.password=zk-ks-passwd
    ssl.trustStore.location=/path/to/zk/truststore.jks
    ssl.trustStore.password=zk-ts-passwd
    IMPORTANT: ZooKeeper does not support setting the key password in the ZooKeeper server keystore to a value different from the keystore password itself. Be sure to set the key password to be the same as the keystore password.

    Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with mTLS authentication. These configurations are described above in Broker Configs.

    # connect to the ZooKeeper port configured for TLS
    zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
    # required to use TLS to ZooKeeper (default is false)
    zookeeper.ssl.client.enable=true
    # required to use TLS to ZooKeeper
    zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
    # define key/trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
    zookeeper.ssl.keystore.location=/path/to/kafka/keystore.jks
    zookeeper.ssl.keystore.password=kafka-ks-passwd
    zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
    zookeeper.ssl.truststore.password=kafka-ts-passwd
    # tell broker to create ACLs on znodes
    zookeeper.set.acl=true
    IMPORTANT: ZooKeeper does not support setting the key password in the ZooKeeper client (i.e. broker) keystore to a value different from the keystore password itself. Be sure to set the key password to be the same as the keystore password.

    7.6.2 Migrating clusters

    If you are running a version of Kafka that does not support security or simply with security disabled, and you want to make the cluster secure, then you need to execute the following steps to enable ZooKeeper authentication with minimal disruption to your operations:
    1. Enable SASL and/or mTLS authentication on ZooKeeper. If enabling mTLS, you would now have both a non-TLS port and a TLS port, like this:
      clientPort=2181
      secureClientPort=2182
      serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
      authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
      ssl.keyStore.location=/path/to/zk/keystore.jks
      ssl.keyStore.password=zk-ks-passwd
      ssl.trustStore.location=/path/to/zk/truststore.jks
      ssl.trustStore.password=zk-ts-passwd
    2. Perform a rolling restart of brokers setting the JAAS login file and/or defining ZooKeeper mutual TLS configurations (including connecting to the TLS-enabled ZooKeeper port) as required, which enables brokers to authenticate to ZooKeeper. At the end of the rolling restart, brokers are able to manipulate znodes with strict ACLs, but they will not create znodes with those ACLs
    3. If you enabled mTLS, disable the non-TLS port in ZooKeeper
    4. Perform a second rolling restart of brokers, this time setting the configuration parameter zookeeper.set.acl to true, which enables the use of secure ACLs when creating znodes
    5. Execute the ZkSecurityMigrator tool. To execute the tool, there is this script: bin/zookeeper-security-migration.sh with zookeeper.acl set to secure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes. Use the --zk-tls-config-file <file> option if you enable mTLS.

    It is also possible to turn off authentication in a secure cluster. To do it, follow these steps:

    1. Perform a rolling restart of brokers setting the JAAS login file and/or defining ZooKeeper mutual TLS configurations, which enables brokers to authenticate, but setting zookeeper.set.acl to false. At the end of the rolling restart, brokers stop creating znodes with secure ACLs, but are still able to authenticate and manipulate all znodes
    2. Execute the ZkSecurityMigrator tool. To execute the tool, run this script bin/zookeeper-security-migration.sh with zookeeper.acl set to unsecure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes. Use the --zk-tls-config-file <file> option if you need to set TLS configuration.
    3. If you are disabling mTLS, enable the non-TLS port in ZooKeeper
    4. Perform a second rolling restart of brokers, this time omitting the system property that sets the JAAS login file and/or removing ZooKeeper mutual TLS configuration (including connecting to the non-TLS-enabled ZooKeeper port) as required
    5. If you are disabling mTLS, disable the TLS port in ZooKeeper
    Here is an example of how to run the migration tool:
    bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181

    Run this to see the full list of parameters:

    bin/zookeeper-security-migration.sh --help

    7.6.3 Migrating the ZooKeeper ensemble

    It is also necessary to enable SASL and/or mTLS authentication on the ZooKeeper ensemble. To do it, we need to perform a rolling restart of the server and set a few properties. See above for mTLS information. Please refer to the ZooKeeper documentation for more detail:
    1. Apache ZooKeeper documentation
    2. Apache ZooKeeper wiki

    7.6.4 ZooKeeper Quorum Mutual TLS Authentication

    It is possible to enable mTLS authentication between the ZooKeeper servers themselves. Please refer to the ZooKeeper documentation for more detail.

    7.7 ZooKeeper Encryption

    ZooKeeper connections that use mutual TLS are encrypted. Beginning with ZooKeeper version 3.5.7 (the version shipped with Kafka version 2.5) ZooKeeper supports a sever-side config ssl.clientAuth (case-insensitively: want/need/none are the valid options, the default is need), and setting this value to none in ZooKeeper allows clients to connect via a TLS-encrypted connection without presenting their own certificate. Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with just TLS encryption. These configurations are described above in Broker Configs.
    # connect to the ZooKeeper port configured for TLS
    zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
    # required to use TLS to ZooKeeper (default is false)
    zookeeper.ssl.client.enable=true
    # required to use TLS to ZooKeeper
    zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
    # define trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
    # no need to set keystore information assuming ssl.clientAuth=none on ZooKeeper
    zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
    zookeeper.ssl.truststore.password=kafka-ts-passwd
    # tell broker to create ACLs on znodes (if using SASL authentication, otherwise do not set this)
    zookeeper.set.acl=true
    - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -