32/upgrade.html [767:1740]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - the validation due to various causes (mismatch magic bytes, crc checksum errors, null key for log compacted topics, etc), the whole batch would be rejected with the same and misleading CORRUPT_MESSAGE, and the caller of the producer client would see the corresponding exception from either the future object of RecordMetadata returned from the send call as well as in the Callback#onCompletion(RecordMetadata metadata, Exception exception) Now with the new error code and improved error messages of the exception, producer callers would be better informed about the root cause why their sent records were failed.
  • We are introducing incremental cooperative rebalancing to the clients' group protocol, which allows consumers to keep all of their assigned partitions during a rebalance and at the end revoke only those which must be migrated to another consumer for overall cluster balance. The ConsumerCoordinator will choose the latest RebalanceProtocol that is commonly supported by all of the consumer's supported assignors. You can use the new built-in CooperativeStickyAssignor or plug in your own custom cooperative assignor. To do so you must implement the ConsumerPartitionAssignor interface and include RebalanceProtocol.COOPERATIVE in the list returned by ConsumerPartitionAssignor#supportedProtocols. Your custom assignor can then leverage the ownedPartitions field in each consumer's Subscription to give partitions back to their previous owners whenever possible. Note that when a partition is to be reassigned to another consumer, it must be removed from the new assignment until it has been revoked from its original owner. Any consumer that has to revoke a partition will trigger a followup rebalance to allow the revoked partition to safely be assigned to its new owner. See the ConsumerPartitionAssignor RebalanceProtocol javadocs for more information.
    To upgrade from the old (eager) protocol, which always revokes all partitions before rebalancing, to cooperative rebalancing, you must follow a specific upgrade path to get all clients on the same ConsumerPartitionAssignor that supports the cooperative protocol. This can be done with two rolling bounces, using the CooperativeStickyAssignor for the example: during the first one, add "cooperative-sticky" to the list of supported assignors for each member (without removing the previous assignor -- note that if previously using the default, you must include that explicitly as well). You then bounce and/or upgrade it. Once the entire group is on 2.4+ and all members have the "cooperative-sticky" among their supported assignors, remove the other assignor(s) and perform a second rolling bounce so that by the end all members support only the cooperative protocol. For further details on the cooperative rebalancing protocol and upgrade path, see KIP-429.
  • There are some behavioral changes to the ConsumerRebalanceListener, as well as a new API. Exceptions thrown during any of the listener's three callbacks will no longer be swallowed, and will instead be re-thrown all the way up to the Consumer.poll() call. The onPartitionsLost method has been added to allow users to react to abnormal circumstances where a consumer may have lost ownership of its partitions (such as a missed rebalance) and cannot commit offsets. By default, this will simply call the existing onPartitionsRevoked API to align with previous behavior. Note however that onPartitionsLost will not be called when the set of lost partitions is empty. This means that no callback will be invoked at the beginning of the first rebalance of a new consumer joining the group.
    The semantics of the ConsumerRebalanceListener's callbacks are further changed when following the cooperative rebalancing protocol described above. In addition to onPartitionsLost, onPartitionsRevoked will also never be called when the set of revoked partitions is empty. The callback will generally be invoked only at the end of a rebalance, and only on the set of partitions that are being moved to another consumer. The onPartitionsAssigned callback will however always be called, even with an empty set of partitions, as a way to notify users of a rebalance event (this is true for both cooperative and eager). For details on the new callback semantics, see the ConsumerRebalanceListener javadocs.
  • The Scala trait kafka.security.auth.Authorizer has been deprecated and replaced with a new Java API org.apache.kafka.server.authorizer.Authorizer. The authorizer implementation class kafka.security.auth.SimpleAclAuthorizer has also been deprecated and replaced with a new implementation kafka.security.authorizer.AclAuthorizer. AclAuthorizer uses features supported by the new API to improve authorization logging and is compatible with SimpleAclAuthorizer. For more details, see KIP-504.
  • Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x to 2.3.0

    If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.

    For a rolling upgrade:

    1. Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. If you are upgrading from 0.11.0.x, 1.0.x, 1.1.x, 2.0.x, or 2.1.x, and you have not overridden the message format, then you only need to override the inter-broker protocol version.
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations. It is still possible to downgrade at this point if there are any problems.
    3. Once the cluster's behavior and performance has been verified, bump the protocol version by editing inter.broker.protocol.version and setting it to 2.3.
    4. Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest protocol version, it will no longer be possible to downgrade the cluster to an older version.
    5. If you have overridden the message format version as instructed above, then you need to do one more rolling restart to upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later, change log.message.format.version to 2.3 on each broker and restart them one by one. Note that the older Scala clients, which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs (or to take advantage of exactly once semantics), the newer Java clients must be used.
    Notable changes in 2.3.0

    Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x to 2.2.0

    If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.

    For a rolling upgrade:

    1. Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. If you are upgrading from 0.11.0.x, 1.0.x, 1.1.x, or 2.0.x and you have not overridden the message format, then you only need to override the inter-broker protocol version.
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations. It is still possible to downgrade at this point if there are any problems.
    3. Once the cluster's behavior and performance has been verified, bump the protocol version by editing inter.broker.protocol.version and setting it to 2.2.
    4. Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest protocol version, it will no longer be possible to downgrade the cluster to an older version.
    5. If you have overridden the message format version as instructed above, then you need to do one more rolling restart to upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later, change log.message.format.version to 2.2 on each broker and restart them one by one. Note that the older Scala clients, which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs (or to take advantage of exactly once semantics), the newer Java clients must be used.
    Notable changes in 2.2.1
    Notable changes in 2.2.0

    Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, or 2.0.0 to 2.1.0

    Note that 2.1.x contains a change to the internal schema used to store consumer offsets. Once the upgrade is complete, it will not be possible to downgrade to previous versions. See the rolling upgrade notes below for more detail.

    For a rolling upgrade:

    1. Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. If you are upgrading from 0.11.0.x, 1.0.x, 1.1.x, or 2.0.x and you have not overridden the message format, then you only need to override the inter-broker protocol version.
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations. It is still possible to downgrade at this point if there are any problems.
    3. Once the cluster's behavior and performance has been verified, bump the protocol version by editing inter.broker.protocol.version and setting it to 2.1.
    4. Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest protocol version, it will no longer be possible to downgrade the cluster to an older version.
    5. If you have overridden the message format version as instructed above, then you need to do one more rolling restart to upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later, change log.message.format.version to 2.1 on each broker and restart them one by one. Note that the older Scala clients, which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs (or to take advantage of exactly once semantics), the newer Java clients must be used.

    Additional Upgrade Notes:

    1. Offset expiration semantics has slightly changed in this version. According to the new semantics, offsets of partitions in a group will not be removed while the group is subscribed to the corresponding topic and is still active (has active consumers). If group becomes empty all its offsets will be removed after default offset retention period (or the one set by broker) has passed (unless the group becomes active again). Offsets associated with standalone (simple) consumers, that do not use Kafka group management, will be removed after default offset retention period (or the one set by broker) has passed since their last commit.
    2. The default for console consumer's enable.auto.commit property when no group.id is provided is now set to false. This is to avoid polluting the consumer coordinator cache as the auto-generated group is not likely to be used by other consumers.
    3. The default value for the producer's retries config was changed to Integer.MAX_VALUE, as we introduced delivery.timeout.ms in KIP-91, which sets an upper bound on the total time between sending a record and receiving acknowledgement from the broker. By default, the delivery timeout is set to 2 minutes.
    4. By default, MirrorMaker now overrides delivery.timeout.ms to Integer.MAX_VALUE when configuring the producer. If you have overridden the value of retries in order to fail faster, you will instead need to override delivery.timeout.ms.
    5. The ListGroup API now expects, as a recommended alternative, Describe Group access to the groups a user should be able to list. Even though the old Describe Cluster access is still supported for backward compatibility, using it for this API is not advised.
    6. KIP-336 deprecates the ExtendedSerializer and ExtendedDeserializer interfaces and propagates the usage of Serializer and Deserializer. ExtendedSerializer and ExtendedDeserializer were introduced with KIP-82 to provide record headers for serializers and deserializers in a Java 7 compatible fashion. Now we consolidated these interfaces as Java 7 support has been dropped since.
    Notable changes in 2.1.0

    Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 2.0.0

    Kafka 2.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. However, please review the notable changes in 2.0.0 before upgrading.

    For a rolling upgrade:

    1. Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. If you are upgrading from 0.11.0.x, 1.0.x, or 1.1.x and you have not overridden the message format, then you only need to override the inter-broker protocol format.
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
    3. Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 2.0.
    4. Restart the brokers one by one for the new protocol version to take effect.
    5. If you have overridden the message format version as instructed above, then you need to do one more rolling restart to upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later, change log.message.format.version to 2.0 on each broker and restart them one by one. Note that the older Scala consumer does not support the new message format introduced in 0.11, so to avoid the performance cost of down-conversion (or to take advantage of exactly once semantics), the newer Java consumer must be used.

    Additional Upgrade Notes:

    1. If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start with the new protocol by default.
    2. Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after. Similarly for the message format version.
    3. If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities. Hot-swapping the jar-file only might not work.
    4. ACLs should not be added to prefixed resources, (added in KIP-290), until all brokers in the cluster have been updated.

      NOTE: any prefixed ACLs added to a cluster, even after the cluster is fully upgraded, will be ignored should the cluster be downgraded again.

    Notable changes in 2.0.0
    New Protocol Versions
    Upgrading a 1.1 Kafka Streams Application

    Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, or 1.0.x to 1.1.x

    Kafka 1.1.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. However, please review the notable changes in 1.1.0 before upgrading.

    For a rolling upgrade:

    1. Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. If you are upgrading from 0.11.0.x or 1.0.x and you have not overridden the message format, then you only need to override the inter-broker protocol format.
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
    3. Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 1.1.
    4. Restart the brokers one by one for the new protocol version to take effect.
    5. If you have overridden the message format version as instructed above, then you need to do one more rolling restart to upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later, change log.message.format.version to 1.1 on each broker and restart them one by one. Note that the older Scala consumer does not support the new message format introduced in 0.11, so to avoid the performance cost of down-conversion (or to take advantage of exactly once semantics), the newer Java consumer must be used.

    Additional Upgrade Notes:

    1. If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start with the new protocol by default.
    2. Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after. Similarly for the message format version.
    3. If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguties. Hot-swapping the jar-file only might not work.
    Notable changes in 1.1.1
    Notable changes in 1.1.0
    New Protocol Versions
    Upgrading a 1.0 Kafka Streams Application

    Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x or 0.11.0.x to 1.0.0

    Kafka 1.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. However, please review the notable changes in 1.0.0 before upgrading.

    For a rolling upgrade:

    1. Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. If you are upgrading from 0.11.0.x and you have not overridden the message format, you must set both the message format version and the inter-broker protocol version to 0.11.0.
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
    3. Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 1.0.
    4. Restart the brokers one by one for the new protocol version to take effect.
    5. If you have overridden the message format version as instructed above, then you need to do one more rolling restart to upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later, change log.message.format.version to 1.0 on each broker and restart them one by one. If you are upgrading from 0.11.0 and log.message.format.version is set to 0.11.0, you can update the config and skip the rolling restart. Note that the older Scala consumer does not support the new message format introduced in 0.11, so to avoid the performance cost of down-conversion (or to take advantage of exactly once semantics), the newer Java consumer must be used.

    Additional Upgrade Notes:

    1. If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start with the new protocol by default.
    2. Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after. Similarly for the message format version.
    Notable changes in 1.0.2
    Notable changes in 1.0.1
    Notable changes in 1.0.0
    New Protocol Versions
    Upgrading a 0.11.0 Kafka Streams Application
    Upgrading a 0.10.2 Kafka Streams Application
    Upgrading a 0.10.1 Kafka Streams Application
    Upgrading a 0.10.0 Kafka Streams Application

    Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x or 0.10.2.x to 0.11.0.0

    Kafka 0.11.0.0 introduces a new message format version as well as wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. However, please review the notable changes in 0.11.0.0 before upgrading.

    Starting with version 0.10.2, Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.11.0 clients can talk to version 0.10.0 or newer brokers. However, if your brokers are older than 0.10.0, you must upgrade all the brokers in the Kafka cluster before upgrading your clients. Version 0.11.0 brokers support 0.8.x and newer clients.

    For a rolling upgrade:

    1. Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the current message format version currently in use. If you have not overridden the message format previously, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
    3. Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.11.0, but do not change log.message.format.version yet.
    4. Restart the brokers one by one for the new protocol version to take effect.
    5. Once all (or most) consumers have been upgraded to 0.11.0 or later, then change log.message.format.version to 0.11.0 on each broker and restart them one by one. Note that the older Scala consumer does not support the new message format, so to avoid the performance cost of down-conversion (or to take advantage of exactly once semantics), the new Java consumer must be used.

    Additional Upgrade Notes:

    1. If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start with the new protocol by default.
    2. Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after. Similarly for the message format version.
    3. It is also possible to enable the 0.11.0 message format on individual topics using the topic admin tool (bin/kafka-topics.sh) prior to updating the global setting log.message.format.version.
    4. If you are upgrading from a version prior to 0.10.0, it is NOT necessary to first update the message format to 0.10.0 before you switch to 0.11.0.
    Upgrading a 0.10.2 Kafka Streams Application
    Upgrading a 0.10.1 Kafka Streams Application
    Upgrading a 0.10.0 Kafka Streams Application
    Notable changes in 0.11.0.3
    Notable changes in 0.11.0.0
    New Protocol Versions
    Notes on Exactly Once Semantics

    Kafka 0.11.0 includes support for idempotent and transactional capabilities in the producer. Idempotent delivery ensures that messages are delivered exactly once to a particular topic partition during the lifetime of a single producer. Transactional delivery allows producers to send data to multiple partitions such that either all messages are successfully delivered, or none of them are. Together, these capabilities enable "exactly once semantics" in Kafka. More details on these features are available in the user guide, but below we add a few specific notes on enabling them in an upgraded cluster. Note that enabling EoS is not required and there is no impact on the broker's behavior if unused.

    1. Only the new Java producer and consumer support exactly once semantics.
    2. These features depend crucially on the 0.11.0 message format. Attempting to use them on an older format will result in unsupported version errors.
    3. Transaction state is stored in a new internal topic __transaction_state. This topic is not created until the the first attempt to use a transactional request API. Similar to the consumer offsets topic, there are several settings to control the topic's configuration. For example, transaction.state.log.min.isr controls the minimum ISR for this topic. See the configuration section in the user guide for a full list of options.
    4. For secure clusters, the transactional APIs require new ACLs which can be turned on with the bin/kafka-acls.sh. tool.
    5. EoS in Kafka introduces new request APIs and modifies several existing ones. See KIP-98 for the full details
    Notes on the new message format in 0.11.0

    The 0.11.0 message format includes several major enhancements in order to support better delivery semantics for the producer (see KIP-98) and improved replication fault tolerance (see KIP-101). Although the new format contains more information to make these improvements possible, we have made the batch format much more efficient. As long as the number of messages per batch is more than 2, you can expect lower overall overhead. For smaller batches, however, there may be a small performance impact. See here for the results of our initial performance analysis of the new message format. You can also find more detail on the message format in the KIP-98 proposal.

    One of the notable differences in the new message format is that even uncompressed messages are stored together as a single batch. This has a few implications for the broker configuration max.message.bytes, which limits the size of a single batch. First, if an older client produces messages to a topic partition using the old format, and the messages are individually smaller than max.message.bytes, the broker may still reject them after they are merged into a single batch during the up-conversion process. Generally this can happen when the aggregate size of the individual messages is larger than max.message.bytes. There is a similar effect for older consumers reading messages down-converted from the new format: if the fetch size is not set at least as large as max.message.bytes, the consumer may not be able to make progress even if the individual uncompressed messages are smaller than the configured fetch size. This behavior does not impact the Java client for 0.10.1.0 and later since it uses an updated fetch protocol which ensures that at least one message can be returned even if it exceeds the fetch size. To get around these problems, you should ensure 1) that the producer's batch size is not set larger than max.message.bytes, and 2) that the consumer's fetch size is set at least as large as max.message.bytes.

    Most of the discussion on the performance impact of upgrading to the 0.10.0 message format remains pertinent to the 0.11.0 upgrade. This mainly affects clusters that are not secured with TLS since "zero-copy" transfer is already not possible in that case. In order to avoid the cost of down-conversion, you should ensure that consumer applications are upgraded to the latest 0.11.0 client. Significantly, since the old consumer has been deprecated in 0.11.0.0, it does not support the new message format. You must upgrade to use the new consumer to use the new message format without the cost of down-conversion. Note that 0.11.0 consumers support backwards compatibility with 0.10.0 brokers and upward, so it is possible to upgrade the clients first before the brokers.

    Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.x to 0.10.2.0

    0.10.2.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. However, please review the notable changes in 0.10.2.0 before upgrading.

    Starting with version 0.10.2, Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.10.2 clients can talk to version 0.10.0 or newer brokers. However, if your brokers are older than 0.10.0, you must upgrade all the brokers in the Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 0.8.x and newer clients.

    For a rolling upgrade:

    1. Update server.properties file on all brokers and add the following properties:
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
    3. Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.2.
    4. If your previous message format is 0.10.0, change log.message.format.version to 0.10.2 (this is a no-op as the message format is the same for 0.10.0, 0.10.1 and 0.10.2). If your previous message format version is lower than 0.10.0, do not change log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 or later.
    5. Restart the brokers one by one for the new protocol version to take effect.
    6. If log.message.format.version is still lower than 0.10.0 at this point, wait until all consumers have been upgraded to 0.10.0 or later, then change log.message.format.version to 0.10.2 on each broker and restart them one by one.

    Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.

    Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.

    Upgrading a 0.10.1 Kafka Streams Application
    Upgrading a 0.10.0 Kafka Streams Application
    Notable changes in 0.10.2.2
    Notable changes in 0.10.2.1
    Notable changes in 0.10.2.0
    New Protocol Versions

    Upgrading from 0.8.x, 0.9.x or 0.10.0.X to 0.10.1.0

    0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. However, please notice the Potential breaking changes in 0.10.1.0 before upgrade.
    Note: Because new protocols are introduced, it is important to upgrade your Kafka clusters before upgrading your clients (i.e. 0.10.1.x clients only support 0.10.1.x or later brokers while 0.10.1.x brokers also support older clients).

    For a rolling upgrade:

    1. Update server.properties file on all brokers and add the following properties:
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
    3. Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.1.0.
    4. If your previous message format is 0.10.0, change log.message.format.version to 0.10.1 (this is a no-op as the message format is the same for both 0.10.0 and 0.10.1). If your previous message format version is lower than 0.10.0, do not change log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 or later.
    5. Restart the brokers one by one for the new protocol version to take effect.
    6. If log.message.format.version is still lower than 0.10.0 at this point, wait until all consumers have been upgraded to 0.10.0 or later, then change log.message.format.version to 0.10.1 on each broker and restart them one by one.

    Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.

    Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after. - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 35/upgrade.html [1010:1983]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - the validation due to various causes (mismatch magic bytes, crc checksum errors, null key for log compacted topics, etc), the whole batch would be rejected with the same and misleading CORRUPT_MESSAGE, and the caller of the producer client would see the corresponding exception from either the future object of RecordMetadata returned from the send call as well as in the Callback#onCompletion(RecordMetadata metadata, Exception exception) Now with the new error code and improved error messages of the exception, producer callers would be better informed about the root cause why their sent records were failed.

  • We are introducing incremental cooperative rebalancing to the clients' group protocol, which allows consumers to keep all of their assigned partitions during a rebalance and at the end revoke only those which must be migrated to another consumer for overall cluster balance. The ConsumerCoordinator will choose the latest RebalanceProtocol that is commonly supported by all of the consumer's supported assignors. You can use the new built-in CooperativeStickyAssignor or plug in your own custom cooperative assignor. To do so you must implement the ConsumerPartitionAssignor interface and include RebalanceProtocol.COOPERATIVE in the list returned by ConsumerPartitionAssignor#supportedProtocols. Your custom assignor can then leverage the ownedPartitions field in each consumer's Subscription to give partitions back to their previous owners whenever possible. Note that when a partition is to be reassigned to another consumer, it must be removed from the new assignment until it has been revoked from its original owner. Any consumer that has to revoke a partition will trigger a followup rebalance to allow the revoked partition to safely be assigned to its new owner. See the ConsumerPartitionAssignor RebalanceProtocol javadocs for more information.
    To upgrade from the old (eager) protocol, which always revokes all partitions before rebalancing, to cooperative rebalancing, you must follow a specific upgrade path to get all clients on the same ConsumerPartitionAssignor that supports the cooperative protocol. This can be done with two rolling bounces, using the CooperativeStickyAssignor for the example: during the first one, add "cooperative-sticky" to the list of supported assignors for each member (without removing the previous assignor -- note that if previously using the default, you must include that explicitly as well). You then bounce and/or upgrade it. Once the entire group is on 2.4+ and all members have the "cooperative-sticky" among their supported assignors, remove the other assignor(s) and perform a second rolling bounce so that by the end all members support only the cooperative protocol. For further details on the cooperative rebalancing protocol and upgrade path, see KIP-429.
  • There are some behavioral changes to the ConsumerRebalanceListener, as well as a new API. Exceptions thrown during any of the listener's three callbacks will no longer be swallowed, and will instead be re-thrown all the way up to the Consumer.poll() call. The onPartitionsLost method has been added to allow users to react to abnormal circumstances where a consumer may have lost ownership of its partitions (such as a missed rebalance) and cannot commit offsets. By default, this will simply call the existing onPartitionsRevoked API to align with previous behavior. Note however that onPartitionsLost will not be called when the set of lost partitions is empty. This means that no callback will be invoked at the beginning of the first rebalance of a new consumer joining the group.
    The semantics of the ConsumerRebalanceListener's callbacks are further changed when following the cooperative rebalancing protocol described above. In addition to onPartitionsLost, onPartitionsRevoked will also never be called when the set of revoked partitions is empty. The callback will generally be invoked only at the end of a rebalance, and only on the set of partitions that are being moved to another consumer. The onPartitionsAssigned callback will however always be called, even with an empty set of partitions, as a way to notify users of a rebalance event (this is true for both cooperative and eager). For details on the new callback semantics, see the ConsumerRebalanceListener javadocs.
  • The Scala trait kafka.security.auth.Authorizer has been deprecated and replaced with a new Java API org.apache.kafka.server.authorizer.Authorizer. The authorizer implementation class kafka.security.auth.SimpleAclAuthorizer has also been deprecated and replaced with a new implementation kafka.security.authorizer.AclAuthorizer. AclAuthorizer uses features supported by the new API to improve authorization logging and is compatible with SimpleAclAuthorizer. For more details, see KIP-504.
  • Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x to 2.3.0

    If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.

    For a rolling upgrade:

    1. Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. If you are upgrading from 0.11.0.x, 1.0.x, 1.1.x, 2.0.x, or 2.1.x, and you have not overridden the message format, then you only need to override the inter-broker protocol version.
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations. It is still possible to downgrade at this point if there are any problems.
    3. Once the cluster's behavior and performance has been verified, bump the protocol version by editing inter.broker.protocol.version and setting it to 2.3.
    4. Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest protocol version, it will no longer be possible to downgrade the cluster to an older version.
    5. If you have overridden the message format version as instructed above, then you need to do one more rolling restart to upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later, change log.message.format.version to 2.3 on each broker and restart them one by one. Note that the older Scala clients, which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs (or to take advantage of exactly once semantics), the newer Java clients must be used.
    Notable changes in 2.3.0

    Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x to 2.2.0

    If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.

    For a rolling upgrade:

    1. Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. If you are upgrading from 0.11.0.x, 1.0.x, 1.1.x, or 2.0.x and you have not overridden the message format, then you only need to override the inter-broker protocol version.
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations. It is still possible to downgrade at this point if there are any problems.
    3. Once the cluster's behavior and performance has been verified, bump the protocol version by editing inter.broker.protocol.version and setting it to 2.2.
    4. Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest protocol version, it will no longer be possible to downgrade the cluster to an older version.
    5. If you have overridden the message format version as instructed above, then you need to do one more rolling restart to upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later, change log.message.format.version to 2.2 on each broker and restart them one by one. Note that the older Scala clients, which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs (or to take advantage of exactly once semantics), the newer Java clients must be used.
    Notable changes in 2.2.1
    Notable changes in 2.2.0

    Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, or 2.0.0 to 2.1.0

    Note that 2.1.x contains a change to the internal schema used to store consumer offsets. Once the upgrade is complete, it will not be possible to downgrade to previous versions. See the rolling upgrade notes below for more detail.

    For a rolling upgrade:

    1. Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. If you are upgrading from 0.11.0.x, 1.0.x, 1.1.x, or 2.0.x and you have not overridden the message format, then you only need to override the inter-broker protocol version.
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations. It is still possible to downgrade at this point if there are any problems.
    3. Once the cluster's behavior and performance has been verified, bump the protocol version by editing inter.broker.protocol.version and setting it to 2.1.
    4. Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest protocol version, it will no longer be possible to downgrade the cluster to an older version.
    5. If you have overridden the message format version as instructed above, then you need to do one more rolling restart to upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later, change log.message.format.version to 2.1 on each broker and restart them one by one. Note that the older Scala clients, which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs (or to take advantage of exactly once semantics), the newer Java clients must be used.

    Additional Upgrade Notes:

    1. Offset expiration semantics has slightly changed in this version. According to the new semantics, offsets of partitions in a group will not be removed while the group is subscribed to the corresponding topic and is still active (has active consumers). If group becomes empty all its offsets will be removed after default offset retention period (or the one set by broker) has passed (unless the group becomes active again). Offsets associated with standalone (simple) consumers, that do not use Kafka group management, will be removed after default offset retention period (or the one set by broker) has passed since their last commit.
    2. The default for console consumer's enable.auto.commit property when no group.id is provided is now set to false. This is to avoid polluting the consumer coordinator cache as the auto-generated group is not likely to be used by other consumers.
    3. The default value for the producer's retries config was changed to Integer.MAX_VALUE, as we introduced delivery.timeout.ms in KIP-91, which sets an upper bound on the total time between sending a record and receiving acknowledgement from the broker. By default, the delivery timeout is set to 2 minutes.
    4. By default, MirrorMaker now overrides delivery.timeout.ms to Integer.MAX_VALUE when configuring the producer. If you have overridden the value of retries in order to fail faster, you will instead need to override delivery.timeout.ms.
    5. The ListGroup API now expects, as a recommended alternative, Describe Group access to the groups a user should be able to list. Even though the old Describe Cluster access is still supported for backward compatibility, using it for this API is not advised.
    6. KIP-336 deprecates the ExtendedSerializer and ExtendedDeserializer interfaces and propagates the usage of Serializer and Deserializer. ExtendedSerializer and ExtendedDeserializer were introduced with KIP-82 to provide record headers for serializers and deserializers in a Java 7 compatible fashion. Now we consolidated these interfaces as Java 7 support has been dropped since.
    Notable changes in 2.1.0

    Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 2.0.0

    Kafka 2.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. However, please review the notable changes in 2.0.0 before upgrading.

    For a rolling upgrade:

    1. Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. If you are upgrading from 0.11.0.x, 1.0.x, or 1.1.x and you have not overridden the message format, then you only need to override the inter-broker protocol format.
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
    3. Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 2.0.
    4. Restart the brokers one by one for the new protocol version to take effect.
    5. If you have overridden the message format version as instructed above, then you need to do one more rolling restart to upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later, change log.message.format.version to 2.0 on each broker and restart them one by one. Note that the older Scala consumer does not support the new message format introduced in 0.11, so to avoid the performance cost of down-conversion (or to take advantage of exactly once semantics), the newer Java consumer must be used.

    Additional Upgrade Notes:

    1. If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start with the new protocol by default.
    2. Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after. Similarly for the message format version.
    3. If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities. Hot-swapping the jar-file only might not work.
    4. ACLs should not be added to prefixed resources, (added in KIP-290), until all brokers in the cluster have been updated.

      NOTE: any prefixed ACLs added to a cluster, even after the cluster is fully upgraded, will be ignored should the cluster be downgraded again.

    Notable changes in 2.0.0
    New Protocol Versions
    Upgrading a 1.1 Kafka Streams Application

    Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, or 1.0.x to 1.1.x

    Kafka 1.1.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. However, please review the notable changes in 1.1.0 before upgrading.

    For a rolling upgrade:

    1. Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. If you are upgrading from 0.11.0.x or 1.0.x and you have not overridden the message format, then you only need to override the inter-broker protocol format.
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
    3. Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 1.1.
    4. Restart the brokers one by one for the new protocol version to take effect.
    5. If you have overridden the message format version as instructed above, then you need to do one more rolling restart to upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later, change log.message.format.version to 1.1 on each broker and restart them one by one. Note that the older Scala consumer does not support the new message format introduced in 0.11, so to avoid the performance cost of down-conversion (or to take advantage of exactly once semantics), the newer Java consumer must be used.

    Additional Upgrade Notes:

    1. If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start with the new protocol by default.
    2. Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after. Similarly for the message format version.
    3. If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguties. Hot-swapping the jar-file only might not work.
    Notable changes in 1.1.1
    Notable changes in 1.1.0
    New Protocol Versions
    Upgrading a 1.0 Kafka Streams Application

    Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x or 0.11.0.x to 1.0.0

    Kafka 1.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. However, please review the notable changes in 1.0.0 before upgrading.

    For a rolling upgrade:

    1. Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. If you are upgrading from 0.11.0.x and you have not overridden the message format, you must set both the message format version and the inter-broker protocol version to 0.11.0.
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
    3. Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 1.0.
    4. Restart the brokers one by one for the new protocol version to take effect.
    5. If you have overridden the message format version as instructed above, then you need to do one more rolling restart to upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later, change log.message.format.version to 1.0 on each broker and restart them one by one. If you are upgrading from 0.11.0 and log.message.format.version is set to 0.11.0, you can update the config and skip the rolling restart. Note that the older Scala consumer does not support the new message format introduced in 0.11, so to avoid the performance cost of down-conversion (or to take advantage of exactly once semantics), the newer Java consumer must be used.

    Additional Upgrade Notes:

    1. If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start with the new protocol by default.
    2. Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after. Similarly for the message format version.
    Notable changes in 1.0.2
    Notable changes in 1.0.1
    Notable changes in 1.0.0
    New Protocol Versions
    Upgrading a 0.11.0 Kafka Streams Application
    Upgrading a 0.10.2 Kafka Streams Application
    Upgrading a 0.10.1 Kafka Streams Application
    Upgrading a 0.10.0 Kafka Streams Application

    Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x or 0.10.2.x to 0.11.0.0

    Kafka 0.11.0.0 introduces a new message format version as well as wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. However, please review the notable changes in 0.11.0.0 before upgrading.

    Starting with version 0.10.2, Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.11.0 clients can talk to version 0.10.0 or newer brokers. However, if your brokers are older than 0.10.0, you must upgrade all the brokers in the Kafka cluster before upgrading your clients. Version 0.11.0 brokers support 0.8.x and newer clients.

    For a rolling upgrade:

    1. Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the current message format version currently in use. If you have not overridden the message format previously, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
    3. Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.11.0, but do not change log.message.format.version yet.
    4. Restart the brokers one by one for the new protocol version to take effect.
    5. Once all (or most) consumers have been upgraded to 0.11.0 or later, then change log.message.format.version to 0.11.0 on each broker and restart them one by one. Note that the older Scala consumer does not support the new message format, so to avoid the performance cost of down-conversion (or to take advantage of exactly once semantics), the new Java consumer must be used.

    Additional Upgrade Notes:

    1. If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start with the new protocol by default.
    2. Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after. Similarly for the message format version.
    3. It is also possible to enable the 0.11.0 message format on individual topics using the topic admin tool (bin/kafka-topics.sh) prior to updating the global setting log.message.format.version.
    4. If you are upgrading from a version prior to 0.10.0, it is NOT necessary to first update the message format to 0.10.0 before you switch to 0.11.0.
    Upgrading a 0.10.2 Kafka Streams Application
    Upgrading a 0.10.1 Kafka Streams Application
    Upgrading a 0.10.0 Kafka Streams Application
    Notable changes in 0.11.0.3
    Notable changes in 0.11.0.0
    New Protocol Versions
    Notes on Exactly Once Semantics

    Kafka 0.11.0 includes support for idempotent and transactional capabilities in the producer. Idempotent delivery ensures that messages are delivered exactly once to a particular topic partition during the lifetime of a single producer. Transactional delivery allows producers to send data to multiple partitions such that either all messages are successfully delivered, or none of them are. Together, these capabilities enable "exactly once semantics" in Kafka. More details on these features are available in the user guide, but below we add a few specific notes on enabling them in an upgraded cluster. Note that enabling EoS is not required and there is no impact on the broker's behavior if unused.

    1. Only the new Java producer and consumer support exactly once semantics.
    2. These features depend crucially on the 0.11.0 message format. Attempting to use them on an older format will result in unsupported version errors.
    3. Transaction state is stored in a new internal topic __transaction_state. This topic is not created until the the first attempt to use a transactional request API. Similar to the consumer offsets topic, there are several settings to control the topic's configuration. For example, transaction.state.log.min.isr controls the minimum ISR for this topic. See the configuration section in the user guide for a full list of options.
    4. For secure clusters, the transactional APIs require new ACLs which can be turned on with the bin/kafka-acls.sh. tool.
    5. EoS in Kafka introduces new request APIs and modifies several existing ones. See KIP-98 for the full details
    Notes on the new message format in 0.11.0

    The 0.11.0 message format includes several major enhancements in order to support better delivery semantics for the producer (see KIP-98) and improved replication fault tolerance (see KIP-101). Although the new format contains more information to make these improvements possible, we have made the batch format much more efficient. As long as the number of messages per batch is more than 2, you can expect lower overall overhead. For smaller batches, however, there may be a small performance impact. See here for the results of our initial performance analysis of the new message format. You can also find more detail on the message format in the KIP-98 proposal.

    One of the notable differences in the new message format is that even uncompressed messages are stored together as a single batch. This has a few implications for the broker configuration max.message.bytes, which limits the size of a single batch. First, if an older client produces messages to a topic partition using the old format, and the messages are individually smaller than max.message.bytes, the broker may still reject them after they are merged into a single batch during the up-conversion process. Generally this can happen when the aggregate size of the individual messages is larger than max.message.bytes. There is a similar effect for older consumers reading messages down-converted from the new format: if the fetch size is not set at least as large as max.message.bytes, the consumer may not be able to make progress even if the individual uncompressed messages are smaller than the configured fetch size. This behavior does not impact the Java client for 0.10.1.0 and later since it uses an updated fetch protocol which ensures that at least one message can be returned even if it exceeds the fetch size. To get around these problems, you should ensure 1) that the producer's batch size is not set larger than max.message.bytes, and 2) that the consumer's fetch size is set at least as large as max.message.bytes.

    Most of the discussion on the performance impact of upgrading to the 0.10.0 message format remains pertinent to the 0.11.0 upgrade. This mainly affects clusters that are not secured with TLS since "zero-copy" transfer is already not possible in that case. In order to avoid the cost of down-conversion, you should ensure that consumer applications are upgraded to the latest 0.11.0 client. Significantly, since the old consumer has been deprecated in 0.11.0.0, it does not support the new message format. You must upgrade to use the new consumer to use the new message format without the cost of down-conversion. Note that 0.11.0 consumers support backwards compatibility with 0.10.0 brokers and upward, so it is possible to upgrade the clients first before the brokers.

    Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.x to 0.10.2.0

    0.10.2.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. However, please review the notable changes in 0.10.2.0 before upgrading.

    Starting with version 0.10.2, Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.10.2 clients can talk to version 0.10.0 or newer brokers. However, if your brokers are older than 0.10.0, you must upgrade all the brokers in the Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 0.8.x and newer clients.

    For a rolling upgrade:

    1. Update server.properties file on all brokers and add the following properties:
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
    3. Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.2.
    4. If your previous message format is 0.10.0, change log.message.format.version to 0.10.2 (this is a no-op as the message format is the same for 0.10.0, 0.10.1 and 0.10.2). If your previous message format version is lower than 0.10.0, do not change log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 or later.
    5. Restart the brokers one by one for the new protocol version to take effect.
    6. If log.message.format.version is still lower than 0.10.0 at this point, wait until all consumers have been upgraded to 0.10.0 or later, then change log.message.format.version to 0.10.2 on each broker and restart them one by one.

    Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.

    Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.

    Upgrading a 0.10.1 Kafka Streams Application
    Upgrading a 0.10.0 Kafka Streams Application
    Notable changes in 0.10.2.2
    Notable changes in 0.10.2.1
    Notable changes in 0.10.2.0
    New Protocol Versions

    Upgrading from 0.8.x, 0.9.x or 0.10.0.X to 0.10.1.0

    0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. However, please notice the Potential breaking changes in 0.10.1.0 before upgrade.
    Note: Because new protocols are introduced, it is important to upgrade your Kafka clusters before upgrading your clients (i.e. 0.10.1.x clients only support 0.10.1.x or later brokers while 0.10.1.x brokers also support older clients).

    For a rolling upgrade:

    1. Update server.properties file on all brokers and add the following properties:
    2. Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
    3. Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.1.0.
    4. If your previous message format is 0.10.0, change log.message.format.version to 0.10.1 (this is a no-op as the message format is the same for both 0.10.0 and 0.10.1). If your previous message format version is lower than 0.10.0, do not change log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 or later.
    5. Restart the brokers one by one for the new protocol version to take effect.
    6. If log.message.format.version is still lower than 0.10.0 at this point, wait until all consumers have been upgraded to 0.10.0 or later, then change log.message.format.version to 0.10.1 on each broker and restart them one by one.

    Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.

    Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after. - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -