27/security.html [1068:2123]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - config file. We intend to make these configurable in a future Kafka release.
A token has a current life, and a maximum renewable life. By default, tokens must be renewed once every 24 hours for up to 7 days. These can be configured using delegation.token.expiry.time.ms and delegation.token.max.lifetime.ms config options.
Tokens can also be cancelled explicitly. If a token is not renewed by the token’s expiration time or if token is beyond the max life time, it will be deleted from all broker caches as well as from zookeeper.
Tokens can be created by using Admin APIs or using kafka-delegation-tokens.sh script. Delegation token requests (create/renew/expire/describe) should be issued only on SASL or SSL authenticated channels. Tokens can not be requests if the initial authentication is done through delegation token. kafka-delegation-tokens.sh script examples are given below.
Create a delegation token:
> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1
Renew a delegation token:
> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew --renew-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK
Expire a delegation token:
> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire --expiry-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK
Existing tokens can be described using the --describe option:
> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe --command-config client.properties --owner-principal User:user1
Delegation token authentication piggybacks on the current SASL/SCRAM authentication mechanism. We must enable SASL/SCRAM mechanism on Kafka cluster as described in here.
Configuring Kafka Clients:
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="tokenID123" \
password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
tokenauth="true";
The options username and password are used by clients to configure the token id and
token HMAC. And the option tokenauth is used to indicate the server about token authentication.
In this example, clients connect to the broker using token id: tokenID123. Different clients within a
JVM may connect using different tokens by specifying different token details in sasl.jaas.config
.
JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.
We require a re-deployment when the secret needs to be rotated. During this process, already connected clients will continue to work. But any new connection requests and renew/expire requests with old tokens can fail. Steps are given below.
We intend to automate this in a future Kafka release.
authorizer.class.name=kafka.security.authorizer.AclAuthorizerKafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP". You can read more about the acl structure in KIP-11 and resource patterns in KIP-290. In order to add, remove or list acls you can use the Kafka authorizer CLI. By default, if no ResourcePatterns match a specific Resource R, then R has no associated acls, and therefore no one other than super users is allowed to access R. If you want to change that behavior, you can include the following in server.properties.
allow.everyone.if.no.acl.found=trueOne can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string "User" is case sensitive.
super.users=User:Bob;User:Alice
ssl.principal.mapping.rules
to a customized rule in server.properties.
This config allows a list of rules for mapping X.500 distinguished name to short name. The rules are evaluated in order and the first rule that matches a distinguished name is used to map it to a short name. Any later rules in the list are ignored.
ssl.principal.mapping.rules
is a list where each rule starts with "RULE:" and contains an expression as the following formats. Default rule will return
string representation of the X.500 certificate distinguished name. If the distinguished name matches the pattern, then the replacement command will be run over the name.
This also supports lowercase/uppercase options, to force the translated result to be all lower/uppercase case. This is done by adding a "/L" or "/U' to the end of the rule.
RULE:pattern/replacement/
RULE:pattern/replacement/[LU]
Example ssl.principal.mapping.rules
values are:
RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
DEFAULT
Above rules translate distinguished name "CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "serviceuser"
and "CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "adminuser@admin".
principal.builder.class=CustomizedPrincipalBuilderClass
sasl.kerberos.principal.to.local.rules
to a customized rule in server.properties.
The format of sasl.kerberos.principal.to.local.rules
is a list where each rule works in the same way as the auth_to_local in Kerberos configuration file (krb5.conf). This also support additional lowercase/uppercase rule, to force the translated result to be all lowercase/uppercase. This is done by adding a "/L" or "/U" to the end of the rule. check below formats for syntax.
Each rules starts with RULE: and contains an expression as the following formats. See the kerberos documentation for more details.
RULE:[n:string](regexp)s/pattern/replacement/
RULE:[n:string](regexp)s/pattern/replacement/g
RULE:[n:string](regexp)s/pattern/replacement//L
RULE:[n:string](regexp)s/pattern/replacement/g/L
RULE:[n:string](regexp)s/pattern/replacement//U
RULE:[n:string](regexp)s/pattern/replacement/g/U
An example of adding a rule to properly translate user@MYDOMAIN.COM to user while also keeping the default rule in place is:
sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT
Option | Description | Default | Option type |
---|---|---|---|
--add | Indicates to the script that user is trying to add an acl. | Action | |
--remove | Indicates to the script that user is trying to remove an acl. | Action | |
--list | Indicates to the script that user is trying to list acls. | Action | |
--authorizer | Fully qualified class name of the authorizer. | kafka.security.authorizer.AclAuthorizer | Configuration |
--authorizer-properties | key=val pairs that will be passed to authorizer for initialization. For the default authorizer the example values are: zookeeper.connect=localhost:2181 | Configuration | |
--bootstrap-server | A list of host/port pairs to use for establishing the connection to the Kafka cluster. Only one of --bootstrap-server or --authorizer option must be specified. | Configuration | |
--command-config | A property file containing configs to be passed to Admin Client. This option can only be used with --bootstrap-server option. | Configuration | |
--cluster | Indicates to the script that the user is trying to interact with acls on the singular cluster resource. | ResourcePattern | |
--topic [topic-name] | Indicates to the script that the user is trying to interact with acls on topic resource pattern(s). | ResourcePattern | |
--group [group-name] | Indicates to the script that the user is trying to interact with acls on consumer-group resource pattern(s) | ResourcePattern | |
--transactional-id [transactional-id] | The transactionalId to which ACLs should be added or removed. A value of * indicates the ACLs should apply to all transactionalIds. | ResourcePattern | |
--delegation-token [delegation-token] | Delegation token to which ACLs should be added or removed. A value of * indicates ACL should apply to all tokens. | ResourcePattern | |
--resource-pattern-type [pattern-type] | Indicates to the script the type of resource pattern, (for --add), or resource pattern filter, (for --list and --remove), the user wishes to use. When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'. When listing or removing acls, a specific pattern type filter can be used to list or remove acls from a specific type of resource pattern, or the filter values of 'any' or 'match' can be used, where 'any' will match any pattern type, but will match the resource name exactly, and 'match' will perform pattern matching to list or remove all acls that affect the supplied resource(s). WARNING: 'match', when used in combination with the '--remove' switch, should be used with care. |
literal | Configuration |
--allow-principal | Principal is in PrincipalType:name format that will be added to ACL with Allow permission. Default PrincipalType string "User" is case sensitive. You can specify multiple --allow-principal in a single command. |
Principal | |
--deny-principal | Principal is in PrincipalType:name format that will be added to ACL with Deny permission. Default PrincipalType string "User" is case sensitive. You can specify multiple --deny-principal in a single command. |
Principal | |
--principal | Principal is in PrincipalType:name format that will be used along with --list option. Default PrincipalType string "User" is case sensitive. This will list the ACLs for the specified principal. You can specify multiple --principal in a single command. |
Principal | |
--allow-host | IP address from which principals listed in --allow-principal will have access. | if --allow-principal is specified defaults to * which translates to "all hosts" | Host |
--deny-host | IP address from which principals listed in --deny-principal will be denied access. | if --deny-principal is specified defaults to * which translates to "all hosts" | Host |
--operation | Operation that will be allowed or denied. Valid values are:
|
All | Operation |
--producer | Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE and CREATE on topic. | Convenience | |
--consumer | Convenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group. | Convenience | |
--idempotent | Enable idempotence for the producer. This should be used in combination with the --producer option. Note that idempotence is enabled automatically if the producer is authorized to a particular transactional-id. |
Convenience | |
--force | Convenience option to assume yes to all queries and do not prompt. | Convenience | |
--zk-tls-config-file | Identifies the file where ZooKeeper client TLS connectivity properties for the authorizer are defined. Any properties other than the following (with or without an "authorizer." prefix) are ignored: zookeeper.clientCnxnSocket, zookeeper.ssl.cipher.suites, zookeeper.ssl.client.enable, zookeeper.ssl.crl.enable, zookeeper.ssl.enabled.protocols, zookeeper.ssl.endpoint.identification.algorithm, zookeeper.ssl.keystore.location, zookeeper.ssl.keystore.password, zookeeper.ssl.keystore.type, zookeeper.ssl.ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location, zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type | Configuration |
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topicBy default, all principals that don't have an explicit acl that allows access for an operation to a resource are denied. In rare cases where an allow acl is defined that allows access to all but some principal we will have to use the --deny-principal and --deny-host option. For example, if we want to allow all users to Read from Test-topic but only deny User:BadBob from IP 198.51.100.3 we can do so using following commands:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topicNote that
--allow-host
and --deny-host
only support IP addresses (hostnames are not supported).
Above examples add acls to a topic by specifying --topic [topic-name] as the resource pattern option. Similarly user can add acls to cluster by specifying --cluster and to a consumer group by specifying --group [group-name].
You can add acls on any resource of a certain type, e.g. suppose you wanted to add an acl "Principal User:Peter is allowed to produce to any Topic from IP 198.51.200.0"
You can do that by using the wildcard resource '*', e.g. by executing the CLI with following options:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Peter --allow-host 198.51.200.1 --producer --topic *You can add acls on prefixed resource patterns, e.g. suppose you want to add an acl "Principal User:Jane is allowed to produce to any Topic whose name starts with 'Test-' from any host". You can do that by executing the CLI with following options:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Jane --producer --topic Test- --resource-pattern-type prefixedNote, --resource-pattern-type defaults to 'literal', which only affects resources with the exact same name or, in the case of the wildcard resource name '*', a resource with any name.
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topicIf you want to remove the acl added to the prefixed resource pattern above we can execute the CLI with following options:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Jane --producer --topic Test- --resource-pattern-type Prefixed
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topicHowever, this will only return the acls that have been added to this exact resource pattern. Other acls can exist that affect access to the topic, e.g. any acls on the topic wildcard '*', or any acls on prefixed resource patterns. Acls on the wildcard resource pattern can be queried explicitly:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic *However, it is not necessarily possible to explicitly query for acls on prefixed resource patterns that match Test-topic as the name of such patterns may not be known. We can list all acls affecting Test-topic by using '--resource-pattern-type match', e.g.
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic --resource-pattern-type matchThis will list acls on all matching literal, wildcard and prefixed resource patterns.
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topicSimilarly to add Alice as a consumer of Test-topic with consumer group Group-1 we just have to pass --consumer option:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1Note that for consumer option we must also specify the consumer group. In order to remove a principal from producer or consumer role we just need to pass --remove option.
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --producer --topic Test-topic
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --list --topic Test-topic
Protocol calls are usually performing some operations on certain resources in Kafka. It is required to know the operations and resources to set up effective protection. In this section we'll list these operations and resources, then list the combination of these with the protocols to see the valid scenarios.
There are a few operation primitives that can be used to build up privileges. These can be matched up with certain resources to allow specific protocol calls for a given user. These are:
The operations above can be applied on certain resources which are described below.
In the below table we'll list the valid operations on resources that are executed by the Kafka API protocols.
Protocol (API key) | Operation | Resource | Note |
---|---|---|---|
PRODUCE (0) | Write | TransactionalId | An transactional producer which has its transactional.id set requires this privilege. |
PRODUCE (0) | IdempotentWrite | Cluster | An idempotent produce action requires this privilege. |
PRODUCE (0) | Write | Topic | This applies to a normal produce action. |
FETCH (1) | ClusterAction | Cluster | A follower must have ClusterAction on the Cluster resource in order to fetch partition data. |
FETCH (1) | Read | Topic | Regular Kafka consumers need READ permission on each partition they are fetching. |
LIST_OFFSETS (2) | Describe | Topic | |
METADATA (3) | Describe | Topic | |
METADATA (3) | Create | Cluster | If topic auto-creation is enabled, then the broker-side API will check for the existence of a Cluster level privilege. If it's found then it'll allow creating the topic, otherwise it'll iterate through the Topic level privileges (see the next one). |
METADATA (3) | Create | Topic | This authorizes auto topic creation if enabled but the given user doesn't have a cluster level permission (above). |
LEADER_AND_ISR (4) | ClusterAction | Cluster | |
STOP_REPLICA (5) | ClusterAction | Cluster | |
UPDATE_METADATA (6) | ClusterAction | Cluster | |
CONTROLLED_SHUTDOWN (7) | ClusterAction | Cluster | |
OFFSET_COMMIT (8) | Read | Group | An offset can only be committed if it's authorized to the given group and the topic too (see below). Group access is checked first, then Topic access. |
OFFSET_COMMIT (8) | Read | Topic | Since offset commit is part of the consuming process, it needs privileges for the read action. |
OFFSET_FETCH (9) | Describe | Group | Similarly to OFFSET_COMMIT, the application must have privileges on group and topic level too to be able to fetch. However in this case it requires describe access instead of read. Group access is checked first, then Topic access. |
OFFSET_FETCH (9) | Describe | Topic | |
FIND_COORDINATOR (10) | Describe | Group | The FIND_COORDINATOR request can be of "Group" type in which case it is looking for consumergroup coordinators. This privilege would represent the Group mode. |
FIND_COORDINATOR (10) | Describe | TransactionalId | This applies only on transactional producers and checked when a producer tries to find the transaction coordinator. |
JOIN_GROUP (11) | Read | Group | |
HEARTBEAT (12) | Read | Group | |
LEAVE_GROUP (13) | Read | Group | |
SYNC_GROUP (14) | Read | Group | |
DESCRIBE_GROUPS (15) | Describe | Group | |
LIST_GROUPS (16) | Describe | Cluster | When the broker checks to authorize a list_groups request it first checks for this cluster level authorization. If none found then it proceeds to check the groups individually. This operation doesn't return CLUSTER_AUTHORIZATION_FAILED. |
LIST_GROUPS (16) | Describe | Group | If none of the groups are authorized, then just an empty response will be sent back instead of an error. This operation doesn't return CLUSTER_AUTHORIZATION_FAILED. This is applicable from the 2.1 release. |
SASL_HANDSHAKE (17) | The SASL handshake is part of the authentication process and therefore it's not possible to apply any kind of authorization here. | ||
API_VERSIONS (18) | The API_VERSIONS request is part of the Kafka protocol handshake and happens on connection and before any authentication. Therefore it's not possible to control this with authorization. | ||
CREATE_TOPICS (19) | Create | Cluster | If there is no cluster level authorization then it won't return CLUSTER_AUTHORIZATION_FAILED but fall back to use topic level, which is just below. That'll throw error if there is a problem. |
CREATE_TOPICS (19) | Create | Topic | This is applicable from the 2.0 release. |
DELETE_TOPICS (20) | Delete | Topic | |
DELETE_RECORDS (21) | Delete | Topic | |
INIT_PRODUCER_ID (22) | Write | TransactionalId | |
INIT_PRODUCER_ID (22) | IdempotentWrite | Cluster | |
OFFSET_FOR_LEADER_EPOCH (23) | ClusterAction | Cluster | If there is no cluster level privilege for this operation, then it'll check for topic level one. |
OFFSET_FOR_LEADER_EPOCH (23) | Describe | Topic | This is applicable from the 2.1 release. |
ADD_PARTITIONS_TO_TXN (24) | Write | TransactionalId | This API is only applicable to transactional requests. It first checks for the Write action on the TransactionalId resource, then it checks the Topic in subject (below). |
ADD_PARTITIONS_TO_TXN (24) | Write | Topic | |
ADD_OFFSETS_TO_TXN (25) | Write | TransactionalId | Similarly to ADD_PARTITIONS_TO_TXN this is only applicable to transactional request. It first checks for Write action on the TransactionalId resource, then it checks whether it can Read on the given group (below). |
ADD_OFFSETS_TO_TXN (25) | Read | Group | |
END_TXN (26) | Write | TransactionalId | |
WRITE_TXN_MARKERS (27) | ClusterAction | Cluster | |
TXN_OFFSET_COMMIT (28) | Write | TransactionalId | |
TXN_OFFSET_COMMIT (28) | Read | Group | |
TXN_OFFSET_COMMIT (28) | Read | Topic | |
DESCRIBE_ACLS (29) | Describe | Cluster | |
CREATE_ACLS (30) | Alter | Cluster | |
DELETE_ACLS (31) | Alter | Cluster | |
DESCRIBE_CONFIGS (32) | DescribeConfigs | Cluster | If broker configs are requested, then the broker will check cluster level privileges. |
DESCRIBE_CONFIGS (32) | DescribeConfigs | Topic | If topic configs are requested, then the broker will check topic level privileges. |
ALTER_CONFIGS (33) | AlterConfigs | Cluster | If broker configs are altered, then the broker will check cluster level privileges. |
ALTER_CONFIGS (33) | AlterConfigs | Topic | If topic configs are altered, then the broker will check topic level privileges. |
ALTER_REPLICA_LOG_DIRS (34) | Alter | Cluster | |
DESCRIBE_LOG_DIRS (35) | Describe | Cluster | An empty response will be returned on authorization failure. |
SASL_AUTHENTICATE (36) | SASL_AUTHENTICATE is part of the authentication process and therefore it's not possible to apply any kind of authorization here. | ||
CREATE_PARTITIONS (37) | Alter | Topic | |
CREATE_DELEGATION_TOKEN (38) | Creating delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section. | ||
RENEW_DELEGATION_TOKEN (39) | Renewing delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section. | ||
EXPIRE_DELEGATION_TOKEN (40) | Expiring delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section. | ||
DESCRIBE_DELEGATION_TOKEN (41) | Describe | DelegationToken | Describing delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section. |
DELETE_GROUPS (42) | Delete | Group | |
ELECT_PREFERRED_LEADERS (43) | ClusterAction | Cluster | |
INCREMENTAL_ALTER_CONFIGS (44) | AlterConfigs | Cluster | If broker configs are altered, then the broker will check cluster level privileges. |
INCREMENTAL_ALTER_CONFIGS (44) | AlterConfigs | Topic | If topic configs are altered, then the broker will check topic level privileges. |
ALTER_PARTITION_REASSIGNMENTS (45) | Alter | Cluster | |
LIST_PARTITION_REASSIGNMENTS (46) | Describe | Cluster | |
OFFSET_DELETE (47) | Delete | Group | |
OFFSET_DELETE (47) | Read | Topic |
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
We then restart the clients, changing their config to point at the newly opened, secured port:
bootstrap.servers = [broker1:9092,...]
security.protocol = SSL
...etc
In the second incremental server bounce we instruct Kafka to use SSL as the broker-broker protocol (which will use the same SSL port):
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
security.inter.broker.protocol=SSL
In the final bounce we secure the cluster by closing the PLAINTEXT port:
listeners=SSL://broker1:9092
security.inter.broker.protocol=SSL
Alternatively we might choose to open multiple ports so that different protocols can be used for broker-broker and broker-client communication. Say we wished to use SSL encryption throughout (i.e. for broker-broker and broker-client communication) but we'd like to add SASL authentication to the broker-client connection also. We would achieve this by opening two additional ports during the first bounce:
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
We would then restart the clients, changing their config to point at the newly opened, SASL & SSL secured port:
bootstrap.servers = [broker1:9093,...]
security.protocol = SASL_SSL
...etc
The second server bounce would switch the cluster to use encrypted broker-broker communication via the SSL port we previously opened on port 9092:
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
security.inter.broker.protocol=SSL
The final bounce secures the cluster by closing the PLAINTEXT port.
listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
security.inter.broker.protocol=SSL
ZooKeeper can be secured independently of the Kafka cluster. The steps for doing this are covered in section 7.6.2.
When using mTLS alone, every broker and any CLI tools (such as the ZooKeeper Security Migration Tool) should identify itself with the same Distinguished Name (DN) because it is the DN that is ACL'ed. This can be changed as described below, but it involves writing and deploying a custom ZooKeeper authentication provider. Generally each certificate should have the same DN but a different Subject Alternative Name (SAN) so that hostname verification of the brokers and any CLI tools by ZooKeeper will succeed.
When using SASL authentication to ZooKeeper together with mTLS, both the SASL identity and either the DN that created the znode (i.e. the creating broker's certificate) or the DN of the Security Migration Tool (if migration was performed after the znode was created) will be ACL'ed, and all brokers and CLI tools will be authorized even if they all use different DNs because they will all use the same ACL'ed SASL identity. It is only when using mTLS authentication alone that all the DNs must match (and SANs become critical -- again, in the absence of writing and deploying a custom ZooKeeper authentication provider as described below).
Use the broker properties file to set TLS configs for brokers as described below.
Use the --zk-tls-config-file <file> option to set TLS configs in the Zookeeper Security Migration Tool. The kafka-acls.sh and kafka-configs.sh CLI tools also support the --zk-tls-config-file <file> option.
Use the -zk-tls-config-file <file> option (note the single-dash rather than double-dash) to set TLS configs for the zookeeper-shell.sh CLI tool.
It is possible to use something other than the DN for the identity of mTLS clients by writing a class that extends org.apache.zookeeper.server.auth.X509AuthenticationProvider and overrides the method protected String getClientId(X509Certificate clientCert). Choose a scheme name and set authProvider.[scheme] in ZooKeeper to be the fully-qualified class name of the custom implementation; then set ssl.authProvider=[scheme] to use it.
Here is a sample (partial) ZooKeeper configuration for enabling TLS authentication. These configurations are described in the ZooKeeper Admin Guide.secureClientPort=2182
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
ssl.keyStore.location=/path/to/zk/keystore.jks
ssl.keyStore.password=zk-ks-passwd
ssl.trustStore.location=/path/to/zk/truststore.jks
ssl.trustStore.password=zk-ts-passwd
IMPORTANT: ZooKeeper does not support setting the key password in the ZooKeeper server keystore
to a value different from the keystore password itself.
Be sure to set the key password to be the same as the keystore password.
Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with mTLS authentication. These configurations are described above in Broker Configs.
# connect to the ZooKeeper port configured for TLS
zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
# required to use TLS to ZooKeeper (default is false)
zookeeper.ssl.client.enable=true
# required to use TLS to ZooKeeper
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
# define key/trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
zookeeper.ssl.keystore.location=/path/to/kafka/keystore.jks
zookeeper.ssl.keystore.password=kafka-ks-passwd
zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
zookeeper.ssl.truststore.password=kafka-ts-passwd
# tell broker to create ACLs on znodes
zookeeper.set.acl=true
IMPORTANT: ZooKeeper does not support setting the key password in the ZooKeeper client (i.e. broker) keystore
to a value different from the keystore password itself.
Be sure to set the key password to be the same as the keystore password.
clientPort=2181
secureClientPort=2182
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
ssl.keyStore.location=/path/to/zk/keystore.jks
ssl.keyStore.password=zk-ks-passwd
ssl.trustStore.location=/path/to/zk/truststore.jks
ssl.trustStore.password=zk-ts-passwd
--zk-tls-config-file <file>
option if you enable mTLS.It is also possible to turn off authentication in a secure cluster. To do it, follow these steps:
--zk-tls-config-file <file>
option if you need to set TLS configuration.bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181
Run this to see the full list of parameters:
bin/zookeeper-security-migration.sh --help
# connect to the ZooKeeper port configured for TLS
zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
# required to use TLS to ZooKeeper (default is false)
zookeeper.ssl.client.enable=true
# required to use TLS to ZooKeeper
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
# define trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
# no need to set keystore information assuming ssl.clientAuth=none on ZooKeeper
zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
zookeeper.ssl.truststore.password=kafka-ts-passwd
# tell broker to create ACLs on znodes (if using SASL authentication, otherwise do not set this)
zookeeper.set.acl=true
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
28/security.html [1068:2123]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
config file. We intend to make these configurable in a future Kafka release.
A token has a current life, and a maximum renewable life. By default, tokens must be renewed once every 24 hours for up to 7 days. These can be configured using delegation.token.expiry.time.ms and delegation.token.max.lifetime.ms config options.
Tokens can also be cancelled explicitly. If a token is not renewed by the token’s expiration time or if token is beyond the max life time, it will be deleted from all broker caches as well as from zookeeper.
Tokens can be created by using Admin APIs or using kafka-delegation-tokens.sh script. Delegation token requests (create/renew/expire/describe) should be issued only on SASL or SSL authenticated channels. Tokens can not be requests if the initial authentication is done through delegation token. kafka-delegation-tokens.sh script examples are given below.
Create a delegation token:
> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create --max-life-time-period -1 --command-config client.properties --renewer-principal User:user1
Renew a delegation token:
> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew --renew-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK
Expire a delegation token:
> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire --expiry-time-period -1 --command-config client.properties --hmac ABCDEFGHIJK
Existing tokens can be described using the --describe option:
> bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe --command-config client.properties --owner-principal User:user1
Delegation token authentication piggybacks on the current SASL/SCRAM authentication mechanism. We must enable SASL/SCRAM mechanism on Kafka cluster as described in here.
Configuring Kafka Clients:
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="tokenID123" \
password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
tokenauth="true";
The options username and password are used by clients to configure the token id and
token HMAC. And the option tokenauth is used to indicate the server about token authentication.
In this example, clients connect to the broker using token id: tokenID123. Different clients within a
JVM may connect using different tokens by specifying different token details in sasl.jaas.config
.
JAAS configuration for clients may alternatively be specified as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.
We require a re-deployment when the secret needs to be rotated. During this process, already connected clients will continue to work. But any new connection requests and renew/expire requests with old tokens can fail. Steps are given below.
We intend to automate this in a future Kafka release.
authorizer.class.name=kafka.security.authorizer.AclAuthorizerKafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP". You can read more about the acl structure in KIP-11 and resource patterns in KIP-290. In order to add, remove or list acls you can use the Kafka authorizer CLI. By default, if no ResourcePatterns match a specific Resource R, then R has no associated acls, and therefore no one other than super users is allowed to access R. If you want to change that behavior, you can include the following in server.properties.
allow.everyone.if.no.acl.found=trueOne can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string "User" is case sensitive.
super.users=User:Bob;User:Alice
ssl.principal.mapping.rules
to a customized rule in server.properties.
This config allows a list of rules for mapping X.500 distinguished name to short name. The rules are evaluated in order and the first rule that matches a distinguished name is used to map it to a short name. Any later rules in the list are ignored.
ssl.principal.mapping.rules
is a list where each rule starts with "RULE:" and contains an expression as the following formats. Default rule will return
string representation of the X.500 certificate distinguished name. If the distinguished name matches the pattern, then the replacement command will be run over the name.
This also supports lowercase/uppercase options, to force the translated result to be all lower/uppercase case. This is done by adding a "/L" or "/U' to the end of the rule.
RULE:pattern/replacement/
RULE:pattern/replacement/[LU]
Example ssl.principal.mapping.rules
values are:
RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
DEFAULT
Above rules translate distinguished name "CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "serviceuser"
and "CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "adminuser@admin".
principal.builder.class=CustomizedPrincipalBuilderClass
sasl.kerberos.principal.to.local.rules
to a customized rule in server.properties.
The format of sasl.kerberos.principal.to.local.rules
is a list where each rule works in the same way as the auth_to_local in Kerberos configuration file (krb5.conf). This also support additional lowercase/uppercase rule, to force the translated result to be all lowercase/uppercase. This is done by adding a "/L" or "/U" to the end of the rule. check below formats for syntax.
Each rules starts with RULE: and contains an expression as the following formats. See the kerberos documentation for more details.
RULE:[n:string](regexp)s/pattern/replacement/
RULE:[n:string](regexp)s/pattern/replacement/g
RULE:[n:string](regexp)s/pattern/replacement//L
RULE:[n:string](regexp)s/pattern/replacement/g/L
RULE:[n:string](regexp)s/pattern/replacement//U
RULE:[n:string](regexp)s/pattern/replacement/g/U
An example of adding a rule to properly translate user@MYDOMAIN.COM to user while also keeping the default rule in place is:
sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT
Option | Description | Default | Option type |
---|---|---|---|
--add | Indicates to the script that user is trying to add an acl. | Action | |
--remove | Indicates to the script that user is trying to remove an acl. | Action | |
--list | Indicates to the script that user is trying to list acls. | Action | |
--authorizer | Fully qualified class name of the authorizer. | kafka.security.authorizer.AclAuthorizer | Configuration |
--authorizer-properties | key=val pairs that will be passed to authorizer for initialization. For the default authorizer the example values are: zookeeper.connect=localhost:2181 | Configuration | |
--bootstrap-server | A list of host/port pairs to use for establishing the connection to the Kafka cluster. Only one of --bootstrap-server or --authorizer option must be specified. | Configuration | |
--command-config | A property file containing configs to be passed to Admin Client. This option can only be used with --bootstrap-server option. | Configuration | |
--cluster | Indicates to the script that the user is trying to interact with acls on the singular cluster resource. | ResourcePattern | |
--topic [topic-name] | Indicates to the script that the user is trying to interact with acls on topic resource pattern(s). | ResourcePattern | |
--group [group-name] | Indicates to the script that the user is trying to interact with acls on consumer-group resource pattern(s) | ResourcePattern | |
--transactional-id [transactional-id] | The transactionalId to which ACLs should be added or removed. A value of * indicates the ACLs should apply to all transactionalIds. | ResourcePattern | |
--delegation-token [delegation-token] | Delegation token to which ACLs should be added or removed. A value of * indicates ACL should apply to all tokens. | ResourcePattern | |
--resource-pattern-type [pattern-type] | Indicates to the script the type of resource pattern, (for --add), or resource pattern filter, (for --list and --remove), the user wishes to use. When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'. When listing or removing acls, a specific pattern type filter can be used to list or remove acls from a specific type of resource pattern, or the filter values of 'any' or 'match' can be used, where 'any' will match any pattern type, but will match the resource name exactly, and 'match' will perform pattern matching to list or remove all acls that affect the supplied resource(s). WARNING: 'match', when used in combination with the '--remove' switch, should be used with care. |
literal | Configuration |
--allow-principal | Principal is in PrincipalType:name format that will be added to ACL with Allow permission. Default PrincipalType string "User" is case sensitive. You can specify multiple --allow-principal in a single command. |
Principal | |
--deny-principal | Principal is in PrincipalType:name format that will be added to ACL with Deny permission. Default PrincipalType string "User" is case sensitive. You can specify multiple --deny-principal in a single command. |
Principal | |
--principal | Principal is in PrincipalType:name format that will be used along with --list option. Default PrincipalType string "User" is case sensitive. This will list the ACLs for the specified principal. You can specify multiple --principal in a single command. |
Principal | |
--allow-host | IP address from which principals listed in --allow-principal will have access. | if --allow-principal is specified defaults to * which translates to "all hosts" | Host |
--deny-host | IP address from which principals listed in --deny-principal will be denied access. | if --deny-principal is specified defaults to * which translates to "all hosts" | Host |
--operation | Operation that will be allowed or denied. Valid values are:
|
All | Operation |
--producer | Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE and CREATE on topic. | Convenience | |
--consumer | Convenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group. | Convenience | |
--idempotent | Enable idempotence for the producer. This should be used in combination with the --producer option. Note that idempotence is enabled automatically if the producer is authorized to a particular transactional-id. |
Convenience | |
--force | Convenience option to assume yes to all queries and do not prompt. | Convenience | |
--zk-tls-config-file | Identifies the file where ZooKeeper client TLS connectivity properties for the authorizer are defined. Any properties other than the following (with or without an "authorizer." prefix) are ignored: zookeeper.clientCnxnSocket, zookeeper.ssl.cipher.suites, zookeeper.ssl.client.enable, zookeeper.ssl.crl.enable, zookeeper.ssl.enabled.protocols, zookeeper.ssl.endpoint.identification.algorithm, zookeeper.ssl.keystore.location, zookeeper.ssl.keystore.password, zookeeper.ssl.keystore.type, zookeeper.ssl.ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location, zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type | Configuration |
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topicBy default, all principals that don't have an explicit acl that allows access for an operation to a resource are denied. In rare cases where an allow acl is defined that allows access to all but some principal we will have to use the --deny-principal and --deny-host option. For example, if we want to allow all users to Read from Test-topic but only deny User:BadBob from IP 198.51.100.3 we can do so using following commands:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topicNote that
--allow-host
and --deny-host
only support IP addresses (hostnames are not supported).
Above examples add acls to a topic by specifying --topic [topic-name] as the resource pattern option. Similarly user can add acls to cluster by specifying --cluster and to a consumer group by specifying --group [group-name].
You can add acls on any resource of a certain type, e.g. suppose you wanted to add an acl "Principal User:Peter is allowed to produce to any Topic from IP 198.51.200.0"
You can do that by using the wildcard resource '*', e.g. by executing the CLI with following options:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Peter --allow-host 198.51.200.1 --producer --topic *You can add acls on prefixed resource patterns, e.g. suppose you want to add an acl "Principal User:Jane is allowed to produce to any Topic whose name starts with 'Test-' from any host". You can do that by executing the CLI with following options:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Jane --producer --topic Test- --resource-pattern-type prefixedNote, --resource-pattern-type defaults to 'literal', which only affects resources with the exact same name or, in the case of the wildcard resource name '*', a resource with any name.
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topicIf you want to remove the acl added to the prefixed resource pattern above we can execute the CLI with following options:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Jane --producer --topic Test- --resource-pattern-type Prefixed
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topicHowever, this will only return the acls that have been added to this exact resource pattern. Other acls can exist that affect access to the topic, e.g. any acls on the topic wildcard '*', or any acls on prefixed resource patterns. Acls on the wildcard resource pattern can be queried explicitly:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic *However, it is not necessarily possible to explicitly query for acls on prefixed resource patterns that match Test-topic as the name of such patterns may not be known. We can list all acls affecting Test-topic by using '--resource-pattern-type match', e.g.
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic --resource-pattern-type matchThis will list acls on all matching literal, wildcard and prefixed resource patterns.
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topicSimilarly to add Alice as a consumer of Test-topic with consumer group Group-1 we just have to pass --consumer option:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1Note that for consumer option we must also specify the consumer group. In order to remove a principal from producer or consumer role we just need to pass --remove option.
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --producer --topic Test-topic
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --list --topic Test-topic
Protocol calls are usually performing some operations on certain resources in Kafka. It is required to know the operations and resources to set up effective protection. In this section we'll list these operations and resources, then list the combination of these with the protocols to see the valid scenarios.
There are a few operation primitives that can be used to build up privileges. These can be matched up with certain resources to allow specific protocol calls for a given user. These are:
The operations above can be applied on certain resources which are described below.
In the below table we'll list the valid operations on resources that are executed by the Kafka API protocols.
Protocol (API key) | Operation | Resource | Note |
---|---|---|---|
PRODUCE (0) | Write | TransactionalId | An transactional producer which has its transactional.id set requires this privilege. |
PRODUCE (0) | IdempotentWrite | Cluster | An idempotent produce action requires this privilege. |
PRODUCE (0) | Write | Topic | This applies to a normal produce action. |
FETCH (1) | ClusterAction | Cluster | A follower must have ClusterAction on the Cluster resource in order to fetch partition data. |
FETCH (1) | Read | Topic | Regular Kafka consumers need READ permission on each partition they are fetching. |
LIST_OFFSETS (2) | Describe | Topic | |
METADATA (3) | Describe | Topic | |
METADATA (3) | Create | Cluster | If topic auto-creation is enabled, then the broker-side API will check for the existence of a Cluster level privilege. If it's found then it'll allow creating the topic, otherwise it'll iterate through the Topic level privileges (see the next one). |
METADATA (3) | Create | Topic | This authorizes auto topic creation if enabled but the given user doesn't have a cluster level permission (above). |
LEADER_AND_ISR (4) | ClusterAction | Cluster | |
STOP_REPLICA (5) | ClusterAction | Cluster | |
UPDATE_METADATA (6) | ClusterAction | Cluster | |
CONTROLLED_SHUTDOWN (7) | ClusterAction | Cluster | |
OFFSET_COMMIT (8) | Read | Group | An offset can only be committed if it's authorized to the given group and the topic too (see below). Group access is checked first, then Topic access. |
OFFSET_COMMIT (8) | Read | Topic | Since offset commit is part of the consuming process, it needs privileges for the read action. |
OFFSET_FETCH (9) | Describe | Group | Similarly to OFFSET_COMMIT, the application must have privileges on group and topic level too to be able to fetch. However in this case it requires describe access instead of read. Group access is checked first, then Topic access. |
OFFSET_FETCH (9) | Describe | Topic | |
FIND_COORDINATOR (10) | Describe | Group | The FIND_COORDINATOR request can be of "Group" type in which case it is looking for consumergroup coordinators. This privilege would represent the Group mode. |
FIND_COORDINATOR (10) | Describe | TransactionalId | This applies only on transactional producers and checked when a producer tries to find the transaction coordinator. |
JOIN_GROUP (11) | Read | Group | |
HEARTBEAT (12) | Read | Group | |
LEAVE_GROUP (13) | Read | Group | |
SYNC_GROUP (14) | Read | Group | |
DESCRIBE_GROUPS (15) | Describe | Group | |
LIST_GROUPS (16) | Describe | Cluster | When the broker checks to authorize a list_groups request it first checks for this cluster level authorization. If none found then it proceeds to check the groups individually. This operation doesn't return CLUSTER_AUTHORIZATION_FAILED. |
LIST_GROUPS (16) | Describe | Group | If none of the groups are authorized, then just an empty response will be sent back instead of an error. This operation doesn't return CLUSTER_AUTHORIZATION_FAILED. This is applicable from the 2.1 release. |
SASL_HANDSHAKE (17) | The SASL handshake is part of the authentication process and therefore it's not possible to apply any kind of authorization here. | ||
API_VERSIONS (18) | The API_VERSIONS request is part of the Kafka protocol handshake and happens on connection and before any authentication. Therefore it's not possible to control this with authorization. | ||
CREATE_TOPICS (19) | Create | Cluster | If there is no cluster level authorization then it won't return CLUSTER_AUTHORIZATION_FAILED but fall back to use topic level, which is just below. That'll throw error if there is a problem. |
CREATE_TOPICS (19) | Create | Topic | This is applicable from the 2.0 release. |
DELETE_TOPICS (20) | Delete | Topic | |
DELETE_RECORDS (21) | Delete | Topic | |
INIT_PRODUCER_ID (22) | Write | TransactionalId | |
INIT_PRODUCER_ID (22) | IdempotentWrite | Cluster | |
OFFSET_FOR_LEADER_EPOCH (23) | ClusterAction | Cluster | If there is no cluster level privilege for this operation, then it'll check for topic level one. |
OFFSET_FOR_LEADER_EPOCH (23) | Describe | Topic | This is applicable from the 2.1 release. |
ADD_PARTITIONS_TO_TXN (24) | Write | TransactionalId | This API is only applicable to transactional requests. It first checks for the Write action on the TransactionalId resource, then it checks the Topic in subject (below). |
ADD_PARTITIONS_TO_TXN (24) | Write | Topic | |
ADD_OFFSETS_TO_TXN (25) | Write | TransactionalId | Similarly to ADD_PARTITIONS_TO_TXN this is only applicable to transactional request. It first checks for Write action on the TransactionalId resource, then it checks whether it can Read on the given group (below). |
ADD_OFFSETS_TO_TXN (25) | Read | Group | |
END_TXN (26) | Write | TransactionalId | |
WRITE_TXN_MARKERS (27) | ClusterAction | Cluster | |
TXN_OFFSET_COMMIT (28) | Write | TransactionalId | |
TXN_OFFSET_COMMIT (28) | Read | Group | |
TXN_OFFSET_COMMIT (28) | Read | Topic | |
DESCRIBE_ACLS (29) | Describe | Cluster | |
CREATE_ACLS (30) | Alter | Cluster | |
DELETE_ACLS (31) | Alter | Cluster | |
DESCRIBE_CONFIGS (32) | DescribeConfigs | Cluster | If broker configs are requested, then the broker will check cluster level privileges. |
DESCRIBE_CONFIGS (32) | DescribeConfigs | Topic | If topic configs are requested, then the broker will check topic level privileges. |
ALTER_CONFIGS (33) | AlterConfigs | Cluster | If broker configs are altered, then the broker will check cluster level privileges. |
ALTER_CONFIGS (33) | AlterConfigs | Topic | If topic configs are altered, then the broker will check topic level privileges. |
ALTER_REPLICA_LOG_DIRS (34) | Alter | Cluster | |
DESCRIBE_LOG_DIRS (35) | Describe | Cluster | An empty response will be returned on authorization failure. |
SASL_AUTHENTICATE (36) | SASL_AUTHENTICATE is part of the authentication process and therefore it's not possible to apply any kind of authorization here. | ||
CREATE_PARTITIONS (37) | Alter | Topic | |
CREATE_DELEGATION_TOKEN (38) | Creating delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section. | ||
RENEW_DELEGATION_TOKEN (39) | Renewing delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section. | ||
EXPIRE_DELEGATION_TOKEN (40) | Expiring delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section. | ||
DESCRIBE_DELEGATION_TOKEN (41) | Describe | DelegationToken | Describing delegation tokens has special rules, for this please see the Authentication using Delegation Tokens section. |
DELETE_GROUPS (42) | Delete | Group | |
ELECT_PREFERRED_LEADERS (43) | ClusterAction | Cluster | |
INCREMENTAL_ALTER_CONFIGS (44) | AlterConfigs | Cluster | If broker configs are altered, then the broker will check cluster level privileges. |
INCREMENTAL_ALTER_CONFIGS (44) | AlterConfigs | Topic | If topic configs are altered, then the broker will check topic level privileges. |
ALTER_PARTITION_REASSIGNMENTS (45) | Alter | Cluster | |
LIST_PARTITION_REASSIGNMENTS (46) | Describe | Cluster | |
OFFSET_DELETE (47) | Delete | Group | |
OFFSET_DELETE (47) | Read | Topic |
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
We then restart the clients, changing their config to point at the newly opened, secured port:
bootstrap.servers = [broker1:9092,...]
security.protocol = SSL
...etc
In the second incremental server bounce we instruct Kafka to use SSL as the broker-broker protocol (which will use the same SSL port):
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
security.inter.broker.protocol=SSL
In the final bounce we secure the cluster by closing the PLAINTEXT port:
listeners=SSL://broker1:9092
security.inter.broker.protocol=SSL
Alternatively we might choose to open multiple ports so that different protocols can be used for broker-broker and broker-client communication. Say we wished to use SSL encryption throughout (i.e. for broker-broker and broker-client communication) but we'd like to add SASL authentication to the broker-client connection also. We would achieve this by opening two additional ports during the first bounce:
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
We would then restart the clients, changing their config to point at the newly opened, SASL & SSL secured port:
bootstrap.servers = [broker1:9093,...]
security.protocol = SASL_SSL
...etc
The second server bounce would switch the cluster to use encrypted broker-broker communication via the SSL port we previously opened on port 9092:
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
security.inter.broker.protocol=SSL
The final bounce secures the cluster by closing the PLAINTEXT port.
listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
security.inter.broker.protocol=SSL
ZooKeeper can be secured independently of the Kafka cluster. The steps for doing this are covered in section 7.6.2.
When using mTLS alone, every broker and any CLI tools (such as the ZooKeeper Security Migration Tool) should identify itself with the same Distinguished Name (DN) because it is the DN that is ACL'ed. This can be changed as described below, but it involves writing and deploying a custom ZooKeeper authentication provider. Generally each certificate should have the same DN but a different Subject Alternative Name (SAN) so that hostname verification of the brokers and any CLI tools by ZooKeeper will succeed.
When using SASL authentication to ZooKeeper together with mTLS, both the SASL identity and either the DN that created the znode (i.e. the creating broker's certificate) or the DN of the Security Migration Tool (if migration was performed after the znode was created) will be ACL'ed, and all brokers and CLI tools will be authorized even if they all use different DNs because they will all use the same ACL'ed SASL identity. It is only when using mTLS authentication alone that all the DNs must match (and SANs become critical -- again, in the absence of writing and deploying a custom ZooKeeper authentication provider as described below).
Use the broker properties file to set TLS configs for brokers as described below.
Use the --zk-tls-config-file <file> option to set TLS configs in the Zookeeper Security Migration Tool. The kafka-acls.sh and kafka-configs.sh CLI tools also support the --zk-tls-config-file <file> option.
Use the -zk-tls-config-file <file> option (note the single-dash rather than double-dash) to set TLS configs for the zookeeper-shell.sh CLI tool.
It is possible to use something other than the DN for the identity of mTLS clients by writing a class that extends org.apache.zookeeper.server.auth.X509AuthenticationProvider and overrides the method protected String getClientId(X509Certificate clientCert). Choose a scheme name and set authProvider.[scheme] in ZooKeeper to be the fully-qualified class name of the custom implementation; then set ssl.authProvider=[scheme] to use it.
Here is a sample (partial) ZooKeeper configuration for enabling TLS authentication. These configurations are described in the ZooKeeper Admin Guide.secureClientPort=2182
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
ssl.keyStore.location=/path/to/zk/keystore.jks
ssl.keyStore.password=zk-ks-passwd
ssl.trustStore.location=/path/to/zk/truststore.jks
ssl.trustStore.password=zk-ts-passwd
IMPORTANT: ZooKeeper does not support setting the key password in the ZooKeeper server keystore
to a value different from the keystore password itself.
Be sure to set the key password to be the same as the keystore password.
Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with mTLS authentication. These configurations are described above in Broker Configs.
# connect to the ZooKeeper port configured for TLS
zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
# required to use TLS to ZooKeeper (default is false)
zookeeper.ssl.client.enable=true
# required to use TLS to ZooKeeper
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
# define key/trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
zookeeper.ssl.keystore.location=/path/to/kafka/keystore.jks
zookeeper.ssl.keystore.password=kafka-ks-passwd
zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
zookeeper.ssl.truststore.password=kafka-ts-passwd
# tell broker to create ACLs on znodes
zookeeper.set.acl=true
IMPORTANT: ZooKeeper does not support setting the key password in the ZooKeeper client (i.e. broker) keystore
to a value different from the keystore password itself.
Be sure to set the key password to be the same as the keystore password.
clientPort=2181
secureClientPort=2182
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
ssl.keyStore.location=/path/to/zk/keystore.jks
ssl.keyStore.password=zk-ks-passwd
ssl.trustStore.location=/path/to/zk/truststore.jks
ssl.trustStore.password=zk-ts-passwd
--zk-tls-config-file <file>
option if you enable mTLS.It is also possible to turn off authentication in a secure cluster. To do it, follow these steps:
--zk-tls-config-file <file>
option if you need to set TLS configuration.bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181
Run this to see the full list of parameters:
bin/zookeeper-security-migration.sh --help
# connect to the ZooKeeper port configured for TLS
zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
# required to use TLS to ZooKeeper (default is false)
zookeeper.ssl.client.enable=true
# required to use TLS to ZooKeeper
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
# define trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
# no need to set keystore information assuming ssl.clientAuth=none on ZooKeeper
zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
zookeeper.ssl.truststore.password=kafka-ts-passwd
# tell broker to create ACLs on znodes (if using SASL authentication, otherwise do not set this)
zookeeper.set.acl=true
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -