33/upgrade.html [295:1810]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
for details. A bug prevented the producer idempotence default from being applied which meant that it remained disabled unless the user had
explicitly set enable.idempotence
to true. See KAFKA-13598 for
more details. This issue was fixed and the default is properly applied.
- The producer has stronger delivery guarantees by default:
idempotence
is enabled and acks
is set to all
instead of 1
.
See KIP-679 for details.
In 3.0.0 and 3.1.0, a bug prevented the idempotence default from being applied which meant that it remained disabled unless the user had explicitly set
enable.idempotence
to true. Note that the bug did not affect the acks=all
change. See KAFKA-13598 for more details.
This issue was fixed and the default is properly applied in 3.0.1, 3.1.1, and 3.2.0.
- Java 8 and Scala 2.12 support have been deprecated since Apache Kafka 3.0 and will be removed in Apache Kafka 4.0.
See KIP-750
and KIP-751 for more details.
- ZooKeeper has been upgraded to version 3.6.3.
- A preview of KRaft mode is available, though upgrading to it from the 2.8 Early Access release is not possible. See
the
config/kraft/README.md
file for details.
- The release tarball no longer includes test, sources, javadoc and test sources jars. These are still published to the Maven Central repository.
- A number of implementation dependency jars are now available in the runtime classpath
instead of compile and runtime classpaths. Compilation errors after the upgrade can be fixed by adding the missing dependency jar(s) explicitly
or updating the application not to use internal classes.
- The default value for the consumer configuration
session.timeout.ms
was increased from 10s to 45s. See
KIP-735 for more details.
- The broker configuration
log.message.format.version
and topic configuration message.format.version
have been deprecated.
The value of both configurations is always assumed to be 3.0
if inter.broker.protocol.version
is 3.0
or higher.
If log.message.format.version
or message.format.version
are set, we recommend clearing them at the same time as the
inter.broker.protocol.version
upgrade to 3.0. This will avoid potential compatibility issues if the inter.broker.protocol.version
is downgraded. See KIP-724 for more details.
- The Streams API removed all deprecated APIs that were deprecated in version 2.5.0 or earlier.
For a complete list of removed APIs compare the detailed Kafka Streams upgrade notes.
- Kafka Streams no longer has a compile time dependency on "connect:json" module (KAFKA-5146).
Projects that were relying on this transitive dependency will have to explicitly declare it.
- Custom principal builder implementations specified through
principal.builder.class
must now implement the
KafkaPrincipalSerde
interface to allow for forwarding between brokers. See KIP-590 for more details about the usage of KafkaPrincipalSerde.
- A number of deprecated classes, methods and tools have been removed from the
clients
, connect
, core
and tools
modules:
- The Scala
Authorizer
, SimpleAclAuthorizer
and related classes have been removed. Please use the Java Authorizer
and AclAuthorizer
instead.
- The
Metric#value()
method was removed (KAFKA-12573).
- The
Sum
and Total
classes were removed (KAFKA-12584).
Please use WindowedSum
and CumulativeSum
instead.
- The
Count
and SampledTotal
classes were removed. Please use WindowedCount
and WindowedSum
respectively instead.
- The
PrincipalBuilder
, DefaultPrincipalBuilder
and ResourceFilter
classes were removed.
- Various constants and constructors were removed from
SslConfigs
, SaslConfigs
, AclBinding
and
AclBindingFilter
.
- The
Admin.electedPreferredLeaders()
methods were removed. Please use Admin.electLeaders
instead.
- The
kafka-preferred-replica-election
command line tool was removed. Please use kafka-leader-election
instead.
- The
--zookeeper
option was removed from the kafka-topics
and kafka-reassign-partitions
command line tools.
Please use --bootstrap-server
instead.
- In the
kafka-configs
command line tool, the --zookeeper
option is only supported for updating SCRAM Credentials configuration
and describing/updating dynamic broker configs when brokers are not running. Please use --bootstrap-server
for other configuration operations.
- The
ConfigEntry
constructor was removed (KAFKA-12577).
Please use the remaining public constructor instead.
- The config value
default
for the client config client.dns.lookup
has been removed. In the unlikely
event that you set this config explicitly, we recommend leaving the config unset (use_all_dns_ips
is used by default).
- The
ExtendedDeserializer
and ExtendedSerializer
classes have been removed. Please use Deserializer
and Serializer
instead.
- The
close(long, TimeUnit)
method was removed from the producer, consumer and admin client. Please use
close(Duration)
.
- The
ConsumerConfig.addDeserializerToConfig
and ProducerConfig.addSerializerToConfig
methods
were removed. These methods were not intended to be public API and there is no replacement.
- The
NoOffsetForPartitionException.partition()
method was removed. Please use partitions()
instead.
- The default
partition.assignment.strategy
is changed to "[RangeAssignor, CooperativeStickyAssignor]",
which will use the RangeAssignor by default, but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list.
Please check the client upgrade path guide here for more detail.
- The Scala
kafka.common.MessageFormatter
was removed. Please use the Java org.apache.kafka.common.MessageFormatter
.
- The
MessageFormatter.init(Properties)
method was removed. Please use configure(Map)
instead.
- The
checksum()
method has been removed from ConsumerRecord
and RecordMetadata
. The message
format v2, which has been the default since 0.11, moved the checksum from the record to the record batch. As such, these methods
don't make sense and no replacements exist.
- The
ChecksumMessageFormatter
class was removed. It is not part of the public API, but it may have been used
with kafka-console-consumer.sh
. It reported the checksum of each record, which has not been supported
since message format v2.
- The
org.apache.kafka.clients.consumer.internals.PartitionAssignor
class has been removed. Please use
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
instead.
- The
quota.producer.default
and quota.consumer.default
configurations were removed (KAFKA-12591).
Dynamic quota defaults must be used instead.
- The
port
and host.name
configurations were removed. Please use listeners
instead.
- The
advertised.port
and advertised.host.name
configurations were removed. Please use advertised.listeners
instead.
- The deprecated worker configurations
rest.host.name
and rest.port
were removed (KAFKA-12482) from the Kafka Connect worker configuration.
Please use listeners
instead.
- The
Producer#sendOffsetsToTransaction(Map offsets, String consumerGroupId)
method has been deprecated. Please use
Producer#sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata metadata)
instead, where the ConsumerGroupMetadata
can be retrieved via KafkaConsumer#groupMetadata()
for stronger semantics. Note that the full set of consumer group metadata is only
understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass
in new ConsumerGroupMetadata(consumerGroupId)
to work with older brokers. See KIP-732 for more details.
-
The Connect
internal.key.converter
and internal.value.converter
properties have been completely removed.
The use of these Connect worker properties has been deprecated since version 2.0.0.
Workers are now hardcoded to use the JSON converter with schemas.enable
set to false
. If your cluster has been using
a different internal key or value converter, you can follow the migration steps outlined in KIP-738
to safely upgrade your Connect cluster to 3.0.
- The Connect-based MirrorMaker (MM2) includes changes to support
IdentityReplicationPolicy
, enabling replication without renaming topics.
The existing DefaultReplicationPolicy
is still used by default, but identity replication can be enabled via the
replication.policy
configuration property. This is especially useful for users migrating from the older MirrorMaker (MM1), or for
use-cases with simple one-way replication topologies where topic renaming is undesirable. Note that IdentityReplicationPolicy
, unlike
DefaultReplicationPolicy
, cannot prevent replication cycles based on topic names, so take care to avoid cycles when constructing your
replication topology.
- The original MirrorMaker (MM1) and related classes have been deprecated. Please use the Connect-based
MirrorMaker (MM2), as described in the
Geo-Replication section.
If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
If you are upgrading from version 0.11.0.x or above, and you have not overridden the message format, then you only need to override
the inter-broker protocol version.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g.
2.7
, 2.6
, etc.)
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
It is still possible to downgrade at this point if there are any problems.
- Once the cluster's behavior and performance has been verified, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.8
.
- Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest
protocol version, it will no longer be possible to downgrade the cluster to an older version.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.8 on each broker and restart them one by one. Note that the older Scala clients,
which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs
(or to take advantage of exactly once semantics),
the newer Java clients must be used.
-
The 2.8.0 release added a new method to the Authorizer Interface introduced in
KIP-679.
The motivation is to unblock our future plan to enable the strongest message delivery guarantee by default.
Custom authorizer should consider providing a more efficient implementation that supports audit logging and any custom configs or access rules.
-
IBP 2.8 introduces topic IDs to topics as a part of
KIP-516.
When using ZooKeeper, this information is stored in the TopicZNode. If the cluster is downgraded to a previous IBP or version,
future topics will not get topic IDs and it is not guaranteed that topics will retain their topic IDs in ZooKeeper.
This means that upon upgrading again, some topics or all topics will be assigned new IDs.
- Kafka Streams introduce a type-safe
split()
operator as a substitution for deprecated KStream#branch()
method
(cf. KIP-418).
If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
If you are upgrading from version 0.11.0.x or above, and you have not overridden the message format, then you only need to override
the inter-broker protocol version.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g.
2.6
, 2.5
, etc.)
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
It is still possible to downgrade at this point if there are any problems.
- Once the cluster's behavior and performance has been verified, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.7
.
- Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest
protocol version, it will no longer be possible to downgrade the cluster to an older version.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.7 on each broker and restart them one by one. Note that the older Scala clients,
which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs
(or to take advantage of exactly once semantics),
the newer Java clients must be used.
-
The 2.7.0 release includes the core Raft implementation specified in
KIP-595.
There is a separate "raft" module containing most of the logic. Until integration with the
controller is complete, there is a standalone server that users can use for testing the
performance of the Raft implementation. See the README.md in the raft module for details
-
KIP-651 adds support
for using PEM files for key and trust stores.
-
KIP-612 adds support
for enforcing broker-wide and per-listener connection create rates. The 2.7.0 release contains
the first part of KIP-612 with dynamic configuration coming in the 2.8.0 release.
-
The ability to throttle topic and partition creations or
topics deletions to prevent a cluster from being harmed via
KIP-599
-
When new features become available in Kafka there are two main issues:
- How do Kafka clients become aware of broker capabilities?
- How does the broker decide which features to enable?
KIP-584
provides a flexible and operationally easy solution for client discovery, feature gating and rolling upgrades using a single restart.
-
The ability to print record offsets and headers with the
ConsoleConsumer
is now possible
via KIP-431
-
The addition of KIP-554
continues progress towards the goal of Zookeeper removal from Kafka. The addition of KIP-554
means you don't have to connect directly to ZooKeeper anymore for managing SCRAM credentials.
- Altering non-reconfigurable configs of existent listeners causes
InvalidRequestException
.
By contrast, the previous (unintended) behavior would have caused the updated configuration to be persisted,
but it wouldn't
take effect until the broker was restarted. See KAFKA-10479 for more discussion.
See DynamicBrokerConfig.DynamicSecurityConfigs
and SocketServer.ListenerReconfigurableConfigs
for the supported reconfigurable configs of existent listeners.
-
Kafka Streams adds support for
Sliding Windows Aggregations
in the KStreams DSL.
-
Reverse iteration over state stores enabling more efficient most recent update searches with
KIP-617
-
End-to-End latency metrics in Kafka Steams see
KIP-613
for more details
-
Kafka Streams added metrics reporting default RocksDB properties with
KIP-607
-
Better Scala implicit Serdes support from
KIP-616
If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
If you are upgrading from version 0.11.0.x or above, and you have not overridden the message format, then you only need to override
the inter-broker protocol version.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g.
2.5
, 2.4
, etc.)
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
It is still possible to downgrade at this point if there are any problems.
- Once the cluster's behavior and performance has been verified, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.6
.
- Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest
protocol version, it will no longer be possible to downgrade the cluster to an older version.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.6 on each broker and restart them one by one. Note that the older Scala clients,
which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs
(or to take advantage of exactly once semantics),
the newer Java clients must be used.
- Kafka Streams adds a new processing mode (requires broker 2.5 or newer) that improves application
scalability using exactly-once guarantees
(cf. KIP-447)
- TLSv1.3 has been enabled by default for Java 11 or newer. The client and server will negotiate TLSv1.3 if
both support it and fallback to TLSv1.2 otherwise. See
KIP-573 for more details.
- The default value for the
client.dns.lookup
configuration has been changed from default
to use_all_dns_ips
. If a hostname resolves to multiple IP addresses, clients and brokers will now
attempt to connect to each IP in sequence until the connection is successfully established. See
KIP-602
for more details.
NotLeaderForPartitionException
has been deprecated and replaced with NotLeaderOrFollowerException
.
Fetch requests and other requests intended only for the leader or follower return NOT_LEADER_OR_FOLLOWER(6) instead of REPLICA_NOT_AVAILABLE(9)
if the broker is not a replica, ensuring that this transient error during reassignments is handled by all clients as a retriable exception.
If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
If you are upgrading from version 0.11.0.x or above, and you have not overridden the message format, then you only need to override
the inter-broker protocol version.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g.
2.4
, 2.3
, etc.)
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
It is still possible to downgrade at this point if there are any problems.
- Once the cluster's behavior and performance has been verified, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.5
.
- Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest
protocol version, it will no longer be possible to downgrade the cluster to an older version.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.5 on each broker and restart them one by one. Note that the older Scala clients,
which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs
(or to take advantage of exactly once semantics),
the newer Java clients must be used.
- There are several notable changes to the reassignment tool
kafka-reassign-partitions.sh
following the completion of
KIP-455.
This tool now requires the --additional
flag to be provided when changing the throttle of an
active reassignment. Reassignment cancellation is now possible using the
--cancel
command. Finally, reassignment with --zookeeper
has been deprecated in favor of --bootstrap-server
. See the KIP for more detail.
- When
RebalanceProtocol#COOPERATIVE
is used, Consumer#poll
can still return data
while it is in the middle of a rebalance for those partitions still owned by the consumer; in addition
Consumer#commitSync
now may throw a non-fatal RebalanceInProgressException
to notify
users of such an event, in order to distinguish from the fatal CommitFailedException
and allow
users to complete the ongoing rebalance and then reattempt committing offsets for those still-owned partitions.
- For improved resiliency in typical network environments, the default value of
zookeeper.session.timeout.ms
has been increased from 6s to 18s and
replica.lag.time.max.ms
from 10s to 30s.
- New DSL operator
cogroup()
has been added for aggregating multiple streams together at once.
- Added a new
KStream.toTable()
API to translate an input event stream into a KTable.
- Added a new Serde type
Void
to represent null keys or null values from input topic.
- Deprecated
UsePreviousTimeOnInvalidTimestamp
and replaced it with UsePartitionTimeOnInvalidTimeStamp
.
- Improved exactly-once semantics by adding a pending offset fencing mechanism and stronger transactional commit
consistency check, which greatly simplifies the implementation of a scalable exactly-once application.
We also added a new exactly-once semantics code example under
examples folder. Check out
KIP-447
for the full details.
- Added a new public api
KafkaStreams.queryMetadataForKey(String, K, Serializer) to get detailed information on the key being queried.
It provides information about the partition number where the key resides in addition to hosts containing the active and standby partitions for the key.
- Provided support to query stale stores (for high availability) and the stores belonging to a specific partition by deprecating
KafkaStreams.store(String, QueryableStoreType)
and replacing it with KafkaStreams.store(StoreQueryParameters)
.
- Added a new public api to access lag information for stores local to an instance with
KafkaStreams.allLocalStorePartitionLags()
.
- Scala 2.11 is no longer supported. See
KIP-531
for details.
- All Scala classes from the package
kafka.security.auth
have been deprecated. See
KIP-504
for details of the new Java authorizer API added in 2.4.0. Note that kafka.security.auth.Authorizer
and kafka.security.auth.SimpleAclAuthorizer
were deprecated in 2.4.0.
- TLSv1 and TLSv1.1 have been disabled by default since these have known security vulnerabilities. Only TLSv1.2 is now
enabled by default. You can continue to use TLSv1 and TLSv1.1 by explicitly enabling these in the configuration options
ssl.protocol
and ssl.enabled.protocols
.
- ZooKeeper has been upgraded to 3.5.7, and a ZooKeeper upgrade from 3.4.X to 3.5.7 can fail if there are no snapshot files in the 3.4 data directory.
This usually happens in test upgrades where ZooKeeper 3.5.7 is trying to load an existing 3.4 data dir in which no snapshot file has been created.
For more details about the issue please refer to ZOOKEEPER-3056.
A fix is given in ZOOKEEPER-3056, which is to set
snapshot.trust.empty=true
config in zookeeper.properties
before the upgrade.
- ZooKeeper version 3.5.7 supports TLS-encrypted connectivity to ZooKeeper both with or without client certificates,
and additional Kafka configurations are available to take advantage of this.
See KIP-515 for details.
If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.10.0, 0.11.0, 1.0, 2.0, 2.2).
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact
following the upgrade for the details on what this configuration does.)
If you are upgrading from version 0.11.0.x or above, and you have not overridden the message format, then you only need to override
the inter-broker protocol version.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3).
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
It is still possible to downgrade at this point if there are any problems.
- Once the cluster's behavior and performance has been verified, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.4.
- Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest
protocol version, it will no longer be possible to downgrade the cluster to an older version.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.4 on each broker and restart them one by one. Note that the older Scala clients,
which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs
(or to take advantage of exactly once semantics),
the newer Java clients must be used.
Additional Upgrade Notes:
- ZooKeeper has been upgraded to 3.5.6. ZooKeeper upgrade from 3.4.X to 3.5.6 can fail if there are no snapshot files in 3.4 data directory.
This usually happens in test upgrades where ZooKeeper 3.5.6 is trying to load an existing 3.4 data dir in which no snapshot file has been created.
For more details about the issue please refer to ZOOKEEPER-3056.
A fix is given in ZOOKEEPER-3056, which is to set
snapshot.trust.empty=true
config in zookeeper.properties
before the upgrade. But we have observed data loss in standalone cluster upgrades when using
snapshot.trust.empty=true
config. For more details about the issue please refer to ZOOKEEPER-3644.
So we recommend the safe workaround of copying empty snapshot file to the 3.4 data directory,
if there are no snapshot files in 3.4 data directory. For more details about the workaround please refer to ZooKeeper Upgrade FAQ.
-
An embedded Jetty based AdminServer added in ZooKeeper 3.5.
AdminServer is enabled by default in ZooKeeper and is started on port 8080.
AdminServer is disabled by default in the ZooKeeper config (
zookeeper.properties
) provided by the Apache Kafka distribution.
Make sure to update your local zookeeper.properties
file with admin.enableServer=false
if you wish to disable the AdminServer.
Please refer AdminServer config to configure the AdminServer.
- A new Admin API has been added for partition reassignments. Due to changing the way Kafka propagates reassignment information,
it is possible to lose reassignment state in failure edge cases while upgrading to the new version. It is not recommended to start reassignments while upgrading.
- ZooKeeper has been upgraded from 3.4.14 to 3.5.6. TLS and dynamic reconfiguration are supported by the new version.
- The
bin/kafka-preferred-replica-election.sh
command line tool has been deprecated. It has been replaced by bin/kafka-leader-election.sh
.
- The methods
electPreferredLeaders
in the Java AdminClient
class have been deprecated in favor of the methods electLeaders
.
- Scala code leveraging the
NewTopic(String, int, short)
constructor with literal values will need to explicitly call toShort
on the second literal.
- The argument in the constructor
GroupAuthorizationException(String)
is now used to specify an exception message.
Previously it referred to the group that failed authorization. This was done for consistency with other exception types and to
avoid potential misuse. The constructor TopicAuthorizationException(String)
which was previously used for a single
unauthorized topic was changed similarly.
- The internal
PartitionAssignor
interface has been deprecated and replaced with a new ConsumerPartitionAssignor
in the public API. Some
methods/signatures are slightly different between the two interfaces. Users implementing a custom PartitionAssignor should migrate to the new interface as soon as possible.
- The
DefaultPartitioner
now uses a sticky partitioning strategy. This means that records for specific topic with null keys and no assigned partition
will be sent to the same partition until the batch is ready to be sent. When a new batch is created, a new partition is chosen. This decreases latency to produce, but
it may result in uneven distribution of records across partitions in edge cases. Generally users will not be impacted, but this difference may be noticeable in tests and
other situations producing records for a very short amount of time.
- The blocking
KafkaConsumer#committed
methods have been extended to allow a list of partitions as input parameters rather than a single partition.
It enables fewer request/response iterations between clients and brokers fetching for the committed offsets for the consumer group.
The old overloaded functions are deprecated and we would recommend users to make their code changes to leverage the new methods (details
can be found in KIP-520).
- We've introduced a new
INVALID_RECORD
error in the produce response to distinguish from the CORRUPT_MESSAGE
error.
To be more concrete, previously when a batch of records was sent as part of a single request to the broker and one or more of the records failed
the validation due to various causes (mismatch magic bytes, crc checksum errors, null key for log compacted topics, etc), the whole batch would be rejected
with the same and misleading CORRUPT_MESSAGE
, and the caller of the producer client would see the corresponding exception from either
the future object of RecordMetadata
returned from the send
call as well as in the Callback#onCompletion(RecordMetadata metadata, Exception exception)
Now with the new error code and improved error messages of the exception, producer callers would be better informed about the root cause why their sent records were failed.
- We are introducing incremental cooperative rebalancing to the clients' group protocol, which allows consumers to keep all of their assigned partitions during a rebalance
and at the end revoke only those which must be migrated to another consumer for overall cluster balance. The
ConsumerCoordinator
will choose the latest RebalanceProtocol
that is commonly supported by all of the consumer's supported assignors. You can use the new built-in CooperativeStickyAssignor
or plug in your own custom cooperative assignor. To do
so you must implement the ConsumerPartitionAssignor
interface and include RebalanceProtocol.COOPERATIVE
in the list returned by ConsumerPartitionAssignor#supportedProtocols
.
Your custom assignor can then leverage the ownedPartitions
field in each consumer's Subscription
to give partitions back to their previous owners whenever possible. Note that when
a partition is to be reassigned to another consumer, it must be removed from the new assignment until it has been revoked from its original owner. Any consumer that has to revoke a partition will trigger
a followup rebalance to allow the revoked partition to safely be assigned to its new owner. See the
ConsumerPartitionAssignor RebalanceProtocol javadocs for more information.
To upgrade from the old (eager) protocol, which always revokes all partitions before rebalancing, to cooperative rebalancing, you must follow a specific upgrade path to get all clients on the same ConsumerPartitionAssignor
that supports the cooperative protocol. This can be done with two rolling bounces, using the CooperativeStickyAssignor
for the example: during the first one, add "cooperative-sticky" to the list of supported assignors
for each member (without removing the previous assignor -- note that if previously using the default, you must include that explicitly as well). You then bounce and/or upgrade it.
Once the entire group is on 2.4+ and all members have the "cooperative-sticky" among their supported assignors, remove the other assignor(s) and perform a second rolling bounce so that by the end all members support only the
cooperative protocol. For further details on the cooperative rebalancing protocol and upgrade path, see KIP-429.
- There are some behavioral changes to the
ConsumerRebalanceListener
, as well as a new API. Exceptions thrown during any of the listener's three callbacks will no longer be swallowed, and will instead be re-thrown
all the way up to the Consumer.poll()
call. The onPartitionsLost
method has been added to allow users to react to abnormal circumstances where a consumer may have lost ownership of its partitions
(such as a missed rebalance) and cannot commit offsets. By default, this will simply call the existing onPartitionsRevoked
API to align with previous behavior. Note however that onPartitionsLost
will not
be called when the set of lost partitions is empty. This means that no callback will be invoked at the beginning of the first rebalance of a new consumer joining the group.
The semantics of the ConsumerRebalanceListener's
callbacks are further changed when following the cooperative rebalancing protocol described above. In addition to onPartitionsLost
, onPartitionsRevoked
will also never be called when the set of revoked partitions is empty. The callback will generally be invoked only at the end of a rebalance, and only on the set of partitions that are being moved to another consumer. The
onPartitionsAssigned
callback will however always be called, even with an empty set of partitions, as a way to notify users of a rebalance event (this is true for both cooperative and eager). For details on
the new callback semantics, see the ConsumerRebalanceListener javadocs.
- The Scala trait
kafka.security.auth.Authorizer
has been deprecated and replaced with a new Java API
org.apache.kafka.server.authorizer.Authorizer
. The authorizer implementation class
kafka.security.auth.SimpleAclAuthorizer
has also been deprecated and replaced with a new
implementation kafka.security.authorizer.AclAuthorizer
. AclAuthorizer
uses features
supported by the new API to improve authorization logging and is compatible with SimpleAclAuthorizer
.
For more details, see KIP-504.
If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1).
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact
following the upgrade for the details on what this configuration does.)
If you are upgrading from 0.11.0.x, 1.0.x, 1.1.x, 2.0.x, or 2.1.x, and you have not overridden the message format, then you only need to override
the inter-broker protocol version.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2).
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
It is still possible to downgrade at this point if there are any problems.
- Once the cluster's behavior and performance has been verified, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.3.
- Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest
protocol version, it will no longer be possible to downgrade the cluster to an older version.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.3 on each broker and restart them one by one. Note that the older Scala clients,
which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs
(or to take advantage of exactly once semantics),
the newer Java clients must be used.
- We are introducing a new rebalancing protocol for Kafka Connect based on
incremental cooperative rebalancing.
The new protocol does not require stopping all the tasks during a rebalancing phase between Connect workers. Instead, only the tasks that need to be exchanged
between workers are stopped and they are started in a follow up rebalance. The new Connect protocol is enabled by default beginning with 2.3.0.
For more details on how it works and how to enable the old behavior of eager rebalancing, checkout
incremental cooperative rebalancing design.
- We are introducing static membership towards consumer user. This feature reduces unnecessary rebalances during normal application upgrades or rolling bounces.
For more details on how to use it, checkout static membership design.
- Kafka Streams DSL switches its used store types. While this change is mainly transparent to users, there are some corner cases that may require code changes.
See the Kafka Streams upgrade section for more details.
- Kafka Streams 2.3.0 requires 0.11 message format or higher and does not work with older message format.
If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1).
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact
following the upgrade for the details on what this configuration does.)
If you are upgrading from 0.11.0.x, 1.0.x, 1.1.x, or 2.0.x and you have not overridden the message format, then you only need to override
the inter-broker protocol version.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 2.0).
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
It is still possible to downgrade at this point if there are any problems.
- Once the cluster's behavior and performance has been verified, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.2.
- Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest
protocol version, it will no longer be possible to downgrade the cluster to an older version.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.2 on each broker and restart them one by one. Note that the older Scala clients,
which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs
(or to take advantage of exactly once semantics),
the newer Java clients must be used.
- Kafka Streams 2.2.1 requires 0.11 message format or higher and does not work with older message format.
- The default consumer group id has been changed from the empty string (
""
) to null
. Consumers who use the new default group id will not be able to subscribe to topics,
and fetch or commit offsets. The empty string as consumer group id is deprecated but will be supported until a future major release. Old clients that rely on the empty string group id will now
have to explicitly provide it as part of their consumer config. For more information see
KIP-289.
- The
bin/kafka-topics.sh
command line tool is now able to connect directly to brokers with --bootstrap-server
instead of zookeeper. The old --zookeeper
option is still available for now. Please read KIP-377 for more information.
- Kafka Streams depends on a newer version of RocksDBs that requires MacOS 10.13 or higher.
Note that 2.1.x contains a change to the internal schema used to store consumer offsets. Once the upgrade is
complete, it will not be possible to downgrade to previous versions. See the rolling upgrade notes below for more detail.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1).
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact
following the upgrade for the details on what this configuration does.)
If you are upgrading from 0.11.0.x, 1.0.x, 1.1.x, or 2.0.x and you have not overridden the message format, then you only need to override
the inter-broker protocol version.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 2.0).
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
It is still possible to downgrade at this point if there are any problems.
- Once the cluster's behavior and performance has been verified, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.1.
- Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest
protocol version, it will no longer be possible to downgrade the cluster to an older version.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.1 on each broker and restart them one by one. Note that the older Scala clients,
which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs
(or to take advantage of exactly once semantics),
the newer Java clients must be used.
Additional Upgrade Notes:
- Offset expiration semantics has slightly changed in this version. According to the new semantics, offsets of partitions in a group will
not be removed while the group is subscribed to the corresponding topic and is still active (has active consumers). If group becomes
empty all its offsets will be removed after default offset retention period (or the one set by broker) has passed (unless the group becomes
active again). Offsets associated with standalone (simple) consumers, that do not use Kafka group management, will be removed after default
offset retention period (or the one set by broker) has passed since their last commit.
- The default for console consumer's
enable.auto.commit
property when no group.id
is provided is now set to false
.
This is to avoid polluting the consumer coordinator cache as the auto-generated group is not likely to be used by other consumers.
- The default value for the producer's
retries
config was changed to Integer.MAX_VALUE
, as we introduced delivery.timeout.ms
in KIP-91,
which sets an upper bound on the total time between sending a record and receiving acknowledgement from the broker. By default,
the delivery timeout is set to 2 minutes.
- By default, MirrorMaker now overrides
delivery.timeout.ms
to Integer.MAX_VALUE
when
configuring the producer. If you have overridden the value of retries
in order to fail faster,
you will instead need to override delivery.timeout.ms
.
- The
ListGroup
API now expects, as a recommended alternative, Describe Group
access to the groups a user should be able to list.
Even though the old Describe Cluster
access is still supported for backward compatibility, using it for this API is not advised.
- KIP-336 deprecates the ExtendedSerializer and ExtendedDeserializer interfaces and
propagates the usage of Serializer and Deserializer. ExtendedSerializer and ExtendedDeserializer were introduced with
KIP-82 to provide record headers for serializers and deserializers
in a Java 7 compatible fashion. Now we consolidated these interfaces as Java 7 support has been dropped since.
- Jetty has been upgraded to 9.4.12, which excludes TLS_RSA_* ciphers by default because they do not support forward
secrecy, see https://github.com/eclipse/jetty.project/issues/2807 for more information.
- Unclean leader election is automatically enabled by the controller when
unclean.leader.election.enable
config is dynamically updated by using per-topic config override.
- The
AdminClient
has added a method AdminClient#metrics()
. Now any application using the AdminClient
can gain more information
and insight by viewing the metrics captured from the AdminClient
. For more information
see KIP-324
- Kafka now supports Zstandard compression from KIP-110.
You must upgrade the broker as well as clients to make use of it. Consumers prior to 2.1.0 will not be able to read from topics which use
Zstandard compression, so you should not enable it for a topic until all downstream consumers are upgraded. See the KIP for more detail.
Kafka 2.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below,
you guarantee no downtime during the upgrade. However, please review the notable changes in 2.0.0 before upgrading.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1).
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact
following the upgrade for the details on what this configuration does.)
If you are upgrading from 0.11.0.x, 1.0.x, or 1.1.x and you have not overridden the message format, then you only need to override
the inter-broker protocol format.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1).
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
- Once the entire cluster is upgraded, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.0.
- Restart the brokers one by one for the new protocol version to take effect.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.0 on each broker and restart them one by one. Note that the older Scala consumer
does not support the new message format introduced in 0.11, so to avoid the performance cost of down-conversion (or to
take advantage of exactly once semantics), the newer Java consumer must be used.
Additional Upgrade Notes:
- If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start
with the new protocol by default.
- Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after.
Similarly for the message format version.
- If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities.
Hot-swapping the jar-file only might not work.
- ACLs should not be added to prefixed resources,
(added in KIP-290),
until all brokers in the cluster have been updated.
NOTE: any prefixed ACLs added to a cluster, even after the cluster is fully upgraded, will be ignored should the cluster be downgraded again.
- KIP-186 increases the default offset retention time from 1 day to 7 days. This makes it less likely to "lose" offsets in an application that commits infrequently. It also increases the active set of offsets and therefore can increase memory usage on the broker. Note that the console consumer currently enables offset commit by default and can be the source of a large number of offsets which this change will now preserve for 7 days instead of 1. You can preserve the existing behavior by setting the broker config
offsets.retention.minutes
to 1440.
- Support for Java 7 has been dropped, Java 8 is now the minimum version required.
- The default value for
ssl.endpoint.identification.algorithm
was changed to https
, which performs hostname verification (man-in-the-middle attacks are possible otherwise). Set ssl.endpoint.identification.algorithm
to an empty string to restore the previous behaviour.
- KAFKA-5674 extends the lower interval of
max.connections.per.ip
minimum to zero and therefore allows IP-based filtering of inbound connections.
- KIP-272
added API version tag to the metric
kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...}
.
This metric now becomes kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...},version={0|1|2|3|...}
. This will impact
JMX monitoring tools that do not automatically aggregate. To get the total count for a specific request type, the tool needs to be
updated to aggregate across different versions.
- KIP-225 changed the metric "records.lag" to use tags for topic and partition. The original version with the name format "{topic}-{partition}.records-lag" has been removed.
- The Scala consumers, which have been deprecated since 0.11.0.0, have been removed. The Java consumer has been the recommended option
since 0.10.0.0. Note that the Scala consumers in 1.1.0 (and older) will continue to work even if the brokers are upgraded to 2.0.0.
- The Scala producers, which have been deprecated since 0.10.0.0, have been removed. The Java producer has been the recommended option
since 0.9.0.0. Note that the behaviour of the default partitioner in the Java producer differs from the default partitioner
in the Scala producers. Users migrating should consider configuring a custom partitioner that retains the previous behaviour.
Note that the Scala producers in 1.1.0 (and older) will continue to work even if the brokers are upgraded to 2.0.0.
- MirrorMaker and ConsoleConsumer no longer support the Scala consumer, they always use the Java consumer.
- The ConsoleProducer no longer supports the Scala producer, it always uses the Java producer.
- A number of deprecated tools that rely on the Scala clients have been removed: ReplayLogProducer, SimpleConsumerPerformance, SimpleConsumerShell, ExportZkOffsets, ImportZkOffsets, UpdateOffsetsInZK, VerifyConsumerRebalance.
- The deprecated kafka.tools.ProducerPerformance has been removed, please use org.apache.kafka.tools.ProducerPerformance.
- New Kafka Streams configuration parameter
upgrade.from
added that allows rolling bounce upgrade from older version.
- KIP-284 changed the retention time for Kafka Streams repartition topics by setting its default value to
Long.MAX_VALUE
.
- Updated
ProcessorStateManager
APIs in Kafka Streams for registering state stores to the processor topology. For more details please read the Streams Upgrade Guide.
-
In earlier releases, Connect's worker configuration required the
internal.key.converter
and internal.value.converter
properties.
In 2.0, these are no longer required and default to the JSON converter.
You may safely remove these properties from your Connect standalone and distributed worker configurations:
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
- KIP-266 adds a new consumer configuration
default.api.timeout.ms
to specify the default timeout to use for KafkaConsumer
APIs that could block. The KIP also adds overloads for such blocking
APIs to support specifying a specific timeout to use for each of them instead of using the default timeout set by default.api.timeout.ms
.
In particular, a new poll(Duration)
API has been added which does not block for dynamic partition assignment.
The old poll(long)
API has been deprecated and will be removed in a future version. Overloads have also been added
for other KafkaConsumer
methods like partitionsFor
, listTopics
, offsetsForTimes
,
beginningOffsets
, endOffsets
and close
that take in a Duration
.
- Also as part of KIP-266, the default value of
request.timeout.ms
has been changed to 30 seconds.
The previous value was a little higher than 5 minutes to account for maximum time that a rebalance would take.
Now we treat the JoinGroup request in the rebalance as a special case and use a value derived from
max.poll.interval.ms
for the request timeout. All other request types use the timeout defined
by request.timeout.ms
- The internal method
kafka.admin.AdminClient.deleteRecordsBefore
has been removed. Users are encouraged to migrate to org.apache.kafka.clients.admin.AdminClient.deleteRecords
.
- The AclCommand tool
--producer
convenience option uses the KIP-277 finer grained ACL on the given topic.
- KIP-176 removes
the
--new-consumer
option for all consumer based tools. This option is redundant since the new consumer is automatically
used if --bootstrap-server is defined.
- KIP-290 adds the ability
to define ACLs on prefixed resources, e.g. any topic starting with 'foo'.
- KIP-283 improves message down-conversion
handling on Kafka broker, which has typically been a memory-intensive operation. The KIP adds a mechanism by which the operation becomes less memory intensive
by down-converting chunks of partition data at a time which helps put an upper bound on memory consumption. With this improvement, there is a change in
FetchResponse
protocol behavior where the broker could send an oversized message batch towards the end of the response with an invalid offset.
Such oversized messages must be ignored by consumer clients, as is done by KafkaConsumer
.
KIP-283 also adds new topic and broker configurations message.downconversion.enable
and log.message.downconversion.enable
respectively
to control whether down-conversion is enabled. When disabled, broker does not perform any down-conversion and instead sends an UNSUPPORTED_VERSION
error to the client.
- Dynamic broker configuration options can be stored in ZooKeeper using kafka-configs.sh before brokers are started.
This option can be used to avoid storing clear passwords in server.properties as all password configs may be stored encrypted in ZooKeeper.
- ZooKeeper hosts are now re-resolved if connection attempt fails. But if your ZooKeeper host names resolve
to multiple addresses and some of them are not reachable, then you may need to increase the connection timeout
zookeeper.connection.timeout.ms
.
- KIP-279: OffsetsForLeaderEpochResponse v1 introduces a partition-level
leader_epoch
field.
- KIP-219: Bump up the protocol versions of non-cluster action requests and responses that are throttled on quota violation.
- KIP-290: Bump up the protocol versions ACL create, describe and delete requests and responses.
- Upgrading your Streams application from 1.1 to 2.0 does not require a broker upgrade.
A Kafka Streams 2.0 application can connect to 2.0, 1.1, 1.0, 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
- Note that in 2.0 we have removed the public APIs that are deprecated prior to 1.0; users leveraging on those deprecated APIs need to make code changes accordingly.
See Streams API changes in 2.0.0 for more details.
Kafka 1.1.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below,
you guarantee no downtime during the upgrade. However, please review the notable changes in 1.1.0 before upgrading.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0).
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact
following the upgrade for the details on what this configuration does.)
If you are upgrading from 0.11.0.x or 1.0.x and you have not overridden the message format, then you only need to override
the inter-broker protocol format.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0 or 1.0).
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
- Once the entire cluster is upgraded, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 1.1.
- Restart the brokers one by one for the new protocol version to take effect.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 1.1 on each broker and restart them one by one. Note that the older Scala consumer
does not support the new message format introduced in 0.11, so to avoid the performance cost of down-conversion (or to
take advantage of exactly once semantics), the newer Java consumer must be used.
Additional Upgrade Notes:
- If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start
with the new protocol by default.
- Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after.
Similarly for the message format version.
- If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguties.
Hot-swapping the jar-file only might not work.
- New Kafka Streams configuration parameter
upgrade.from
added that allows rolling bounce upgrade from version 0.10.0.x
- See the Kafka Streams upgrade guide for details about this new config.
- The kafka artifact in Maven no longer depends on log4j or slf4j-log4j12. Similarly to the kafka-clients artifact, users
can now choose the logging back-end by including the appropriate slf4j module (slf4j-log4j12, logback, etc.). The release
tarball still includes log4j and slf4j-log4j12.
- KIP-225 changed the metric "records.lag" to use tags for topic and partition. The original version with the name format "{topic}-{partition}.records-lag" is deprecated and will be removed in 2.0.0.
- Kafka Streams is more robust against broker communication errors. Instead of stopping the Kafka Streams client with a fatal exception,
Kafka Streams tries to self-heal and reconnect to the cluster. Using the new
AdminClient
you have better control of how often
Kafka Streams retries and can configure
fine-grained timeouts (instead of hard coded retries as in older version).
- Kafka Streams rebalance time was reduced further making Kafka Streams more responsive.
- Kafka Connect now supports message headers in both sink and source connectors, and to manipulate them via simple message transforms. Connectors must be changed to explicitly use them. A new
HeaderConverter
is introduced to control how headers are (de)serialized, and the new "SimpleHeaderConverter" is used by default to use string representations of values.
- kafka.tools.DumpLogSegments now automatically sets deep-iteration option if print-data-log is enabled
explicitly or implicitly due to any of the other options like decoder.
- KIP-226 introduced DescribeConfigs Request/Response v1.
- KIP-227 introduced Fetch Request/Response v7.
- Upgrading your Streams application from 1.0 to 1.1 does not require a broker upgrade.
A Kafka Streams 1.1 application can connect to 1.0, 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
- See Streams API changes in 1.1.0 for more details.
Kafka 1.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below,
you guarantee no downtime during the upgrade. However, please review the notable changes in 1.0.0 before upgrading.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0).
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact
following the upgrade for the details on what this configuration does.)
If you are upgrading from 0.11.0.x and you have not overridden the message format, you must set
both the message format version and the inter-broker protocol version to 0.11.0.
- inter.broker.protocol.version=0.11.0
- log.message.format.version=0.11.0
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
- Once the entire cluster is upgraded, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 1.0.
- Restart the brokers one by one for the new protocol version to take effect.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 1.0 on each broker and restart them one by one. If you are upgrading from
0.11.0 and log.message.format.version is set to 0.11.0, you can update the config and skip the rolling restart.
Note that the older Scala consumer does not support the new message format introduced in 0.11, so to avoid the
performance cost of down-conversion (or to take advantage of exactly once semantics),
the newer Java consumer must be used.
Additional Upgrade Notes:
- If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start
with the new protocol by default.
- Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after.
Similarly for the message format version.
- New Kafka Streams configuration parameter
upgrade.from
added that allows rolling bounce upgrade from version 0.10.0.x
- See the Kafka Streams upgrade guide for details about this new config.
- Restored binary compatibility of AdminClient's Options classes (e.g. CreateTopicsOptions, DeleteTopicsOptions, etc.) with
0.11.0.x. Binary (but not source) compatibility had been broken inadvertently in 1.0.0.
- Topic deletion is now enabled by default, since the functionality is now stable. Users who wish to
to retain the previous behavior should set the broker config
delete.topic.enable
to false
. Keep in mind that topic deletion removes data and the operation is not reversible (i.e. there is no "undelete" operation)
- For topics that support timestamp search if no offset can be found for a partition, that partition is now included in the search result with a null offset value. Previously, the partition was not included in the map.
This change was made to make the search behavior consistent with the case of topics not supporting timestamp search.
- If the
inter.broker.protocol.version
is 1.0 or later, a broker will now stay online to serve replicas
on live log directories even if there are offline log directories. A log directory may become offline due to IOException
caused by hardware failure. Users need to monitor the per-broker metric offlineLogDirectoryCount
to check
whether there is offline log directory.
- Added KafkaStorageException which is a retriable exception. KafkaStorageException will be converted to NotLeaderForPartitionException in the response
if the version of the client's FetchRequest or ProducerRequest does not support KafkaStorageException.
- -XX:+DisableExplicitGC was replaced by -XX:+ExplicitGCInvokesConcurrent in the default JVM settings. This helps
avoid out of memory exceptions during allocation of native memory by direct buffers in some cases.
- The overridden
handleError
method implementations have been removed from the following deprecated classes in
the kafka.api
package: FetchRequest
, GroupCoordinatorRequest
, OffsetCommitRequest
,
OffsetFetchRequest
, OffsetRequest
, ProducerRequest
, and TopicMetadataRequest
.
This was only intended for use on the broker, but it is no longer in use and the implementations have not been maintained.
A stub implementation has been retained for binary compatibility.
- The Java clients and tools now accept any string as a client-id.
- The deprecated tool
kafka-consumer-offset-checker.sh
has been removed. Use kafka-consumer-groups.sh
to get consumer group details.
- SimpleAclAuthorizer now logs access denials to the authorizer log by default.
- Authentication failures are now reported to clients as one of the subclasses of
AuthenticationException
.
No retries will be performed if a client connection fails authentication.
- Custom
SaslServer
implementations may throw SaslAuthenticationException
to provide an error
message to return to clients indicating the reason for authentication failure. Implementors should take care not to include
any security-critical information in the exception message that should not be leaked to unauthenticated clients.
- The
app-info
mbean registered with JMX to provide version and commit id will be deprecated and replaced with
metrics providing these attributes.
- Kafka metrics may now contain non-numeric values.
org.apache.kafka.common.Metric#value()
has been deprecated and
will return 0.0
in such cases to minimise the probability of breaking users who read the value of every client
metric (via a MetricsReporter
implementation or by calling the metrics()
method).
org.apache.kafka.common.Metric#metricValue()
can be used to retrieve numeric and non-numeric metric values.
- Every Kafka rate metric now has a corresponding cumulative count metric with the suffix
-total
to simplify downstream processing. For example, records-consumed-rate
has a corresponding
metric named records-consumed-total
.
- Mx4j will only be enabled if the system property
kafka_mx4jenable
is set to true
. Due to a logic
inversion bug, it was previously enabled by default and disabled if kafka_mx4jenable
was set to true
.
- The package
org.apache.kafka.common.security.auth
in the clients jar has been made public and added to the javadocs.
Internal classes which had previously been located in this package have been moved elsewhere.
- When using an Authorizer and a user doesn't have required permissions on a topic, the broker
will return TOPIC_AUTHORIZATION_FAILED errors to requests irrespective of topic existence on broker.
If the user have required permissions and the topic doesn't exists, then the UNKNOWN_TOPIC_OR_PARTITION
error code will be returned.
- config/consumer.properties file updated to use new consumer config properties.
- KIP-112: LeaderAndIsrRequest v1 introduces a partition-level
is_new
field.
- KIP-112: UpdateMetadataRequest v4 introduces a partition-level
offline_replicas
field.
- KIP-112: MetadataResponse v5 introduces a partition-level
offline_replicas
field.
- KIP-112: ProduceResponse v4 introduces error code for KafkaStorageException.
- KIP-112: FetchResponse v6 introduces error code for KafkaStorageException.
- KIP-152:
SaslAuthenticate request has been added to enable reporting of authentication failures. This request will
be used if the SaslHandshake request version is greater than 0.
- Upgrading your Streams application from 0.11.0 to 1.0 does not require a broker upgrade.
A Kafka Streams 1.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
However, Kafka Streams 1.0 requires 0.10 message format or newer and does not work with older message formats.
- If you are monitoring on streams metrics, you will need make some changes to the metrics names in your reporting and monitoring code, because the metrics sensor hierarchy was changed.
- There are a few public APIs including
ProcessorContext#schedule()
, Processor#punctuate()
and KStreamBuilder
, TopologyBuilder
are being deprecated by new APIs.
We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
- See Streams API changes in 1.0.0 for more details.
- Upgrading your Streams application from 0.10.2 to 1.0 does not require a broker upgrade.
A Kafka Streams 1.0 application can connect to 1.0, 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
- If you are monitoring on streams metrics, you will need make some changes to the metrics names in your reporting and monitoring code, because the metrics sensor hierarchy was changed.
- There are a few public APIs including
ProcessorContext#schedule()
, Processor#punctuate()
and KStreamBuilder
, TopologyBuilder
are being deprecated by new APIs.
We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
- If you specify customized
key.serde
, value.serde
and timestamp.extractor
in configs, it is recommended to use their replaced configure parameter as these configs are deprecated.
- See Streams API changes in 0.11.0 for more details.
- Upgrading your Streams application from 0.10.1 to 1.0 does not require a broker upgrade.
A Kafka Streams 1.0 application can connect to 1.0, 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
- You need to recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application.
- If you are monitoring on streams metrics, you will need make some changes to the metrics names in your reporting and monitoring code, because the metrics sensor hierarchy was changed.
- There are a few public APIs including
ProcessorContext#schedule()
, Processor#punctuate()
and KStreamBuilder
, TopologyBuilder
are being deprecated by new APIs.
We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
- If you specify customized
key.serde
, value.serde
and timestamp.extractor
in configs, it is recommended to use their replaced configure parameter as these configs are deprecated.
- If you use a custom (i.e., user implemented) timestamp extractor, you will need to update this code, because the
TimestampExtractor
interface was changed.
- If you register custom metrics, you will need to update this code, because the
StreamsMetric
interface was changed.
- See Streams API changes in 1.0.0,
Streams API changes in 0.11.0 and
Streams API changes in 0.10.2 for more details.
- Upgrading your Streams application from 0.10.0 to 1.0 does require a broker upgrade because a Kafka Streams 1.0 application can only connect to 0.1, 0.11.0, 0.10.2, or 0.10.1 brokers.
- There are couple of API changes, that are not backward compatible (cf. Streams API changes in 1.0.0,
Streams API changes in 0.11.0,
Streams API changes in 0.10.2, and
Streams API changes in 0.10.1 for more details).
Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application.
- Upgrading from 0.10.0.x to 1.0.2 requires two rolling bounces with config
upgrade.from="0.10.0"
set for first upgrade phase
(cf. KIP-268).
As an alternative, an offline upgrade is also possible.
- prepare your application instances for a rolling bounce and make sure that config
upgrade.from
is set to "0.10.0"
for new version 0.11.0.3
- bounce each instance of your application once
- prepare your newly deployed 1.0.2 application instances for a second round of rolling bounces; make sure to remove the value for config
upgrade.from
- bounce each instance of your application once more to complete the upgrade
- Upgrading from 0.10.0.x to 1.0.0 or 1.0.1 requires an offline upgrade (rolling bounce upgrade is not supported)
- stop all old (0.10.0.x) application instances
- update your code and swap old code and jar file with new code and new jar file
- restart all new (1.0.0 or 1.0.1) application instances
Kafka 0.11.0.0 introduces a new message format version as well as wire protocol changes. By following the recommended rolling upgrade plan below,
you guarantee no downtime during the upgrade. However, please review the notable changes in 0.11.0.0 before upgrading.
Starting with version 0.10.2, Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.11.0
clients can talk to version 0.10.0 or newer brokers. However, if your brokers are older than 0.10.0, you must upgrade all the brokers in the
Kafka cluster before upgrading your clients. Version 0.11.0 brokers support 0.8.x and newer clients.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the current message format version currently in use. If you have
not overridden the message format previously, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1 or 0.10.2).
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact
following the upgrade for the details on what this configuration does.)
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
- Once the entire cluster is upgraded, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 0.11.0, but
do not change log.message.format.version
yet.
- Restart the brokers one by one for the new protocol version to take effect.
- Once all (or most) consumers have been upgraded to 0.11.0 or later, then change log.message.format.version to 0.11.0 on each
broker and restart them one by one. Note that the older Scala consumer does not support the new message format, so to avoid
the performance cost of down-conversion (or to take advantage of exactly once semantics),
the new Java consumer must be used.
Additional Upgrade Notes:
- If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start
with the new protocol by default.
- Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after.
Similarly for the message format version.
- It is also possible to enable the 0.11.0 message format on individual topics using the topic admin tool (
bin/kafka-topics.sh
)
prior to updating the global setting log.message.format.version
.
- If you are upgrading from a version prior to 0.10.0, it is NOT necessary to first update the message format to 0.10.0
before you switch to 0.11.0.
- Upgrading your Streams application from 0.10.2 to 0.11.0 does not require a broker upgrade.
A Kafka Streams 0.11.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
- If you specify customized
key.serde
, value.serde
and timestamp.extractor
in configs, it is recommended to use their replaced configure parameter as these configs are deprecated.
- See Streams API changes in 0.11.0 for more details.
- Upgrading your Streams application from 0.10.1 to 0.11.0 does not require a broker upgrade.
A Kafka Streams 0.11.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
- You need to recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application.
- If you specify customized
key.serde
, value.serde
and timestamp.extractor
in configs, it is recommended to use their replaced configure parameter as these configs are deprecated.
- If you use a custom (i.e., user implemented) timestamp extractor, you will need to update this code, because the
TimestampExtractor
interface was changed.
- If you register custom metrics, you will need to update this code, because the
StreamsMetric
interface was changed.
- See Streams API changes in 0.11.0 and
Streams API changes in 0.10.2 for more details.
- Upgrading your Streams application from 0.10.0 to 0.11.0 does require a broker upgrade because a Kafka Streams 0.11.0 application can only connect to 0.11.0, 0.10.2, or 0.10.1 brokers.
- There are couple of API changes, that are not backward compatible (cf. Streams API changes in 0.11.0,
Streams API changes in 0.10.2, and
Streams API changes in 0.10.1 for more details).
Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application.
- Upgrading from 0.10.0.x to 0.11.0.3 requires two rolling bounces with config
upgrade.from="0.10.0"
set for first upgrade phase
(cf. KIP-268).
As an alternative, an offline upgrade is also possible.
- prepare your application instances for a rolling bounce and make sure that config
upgrade.from
is set to "0.10.0"
for new version 0.11.0.3
- bounce each instance of your application once
- prepare your newly deployed 0.11.0.3 application instances for a second round of rolling bounces; make sure to remove the value for config
upgrade.from
- bounce each instance of your application once more to complete the upgrade
- Upgrading from 0.10.0.x to 0.11.0.0, 0.11.0.1, or 0.11.0.2 requires an offline upgrade (rolling bounce upgrade is not supported)
- stop all old (0.10.0.x) application instances
- update your code and swap old code and jar file with new code and new jar file
- restart all new (0.11.0.0 , 0.11.0.1, or 0.11.0.2) application instances
- New Kafka Streams configuration parameter
upgrade.from
added that allows rolling bounce upgrade from version 0.10.0.x
- See the Kafka Streams upgrade guide for details about this new config.
- Unclean leader election is now disabled by default. The new default favors durability over availability. Users who wish to
to retain the previous behavior should set the broker config
unclean.leader.election.enable
to true
.
- Producer configs
block.on.buffer.full
, metadata.fetch.timeout.ms
and timeout.ms
have been
removed. They were initially deprecated in Kafka 0.9.0.0.
- The
offsets.topic.replication.factor
broker config is now enforced upon auto topic creation. Internal
auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this
replication factor requirement.
- When compressing data with snappy, the producer and broker will use the compression scheme's default block size (2 x 32 KB)
instead of 1 KB in order to improve the compression ratio. There have been reports of data compressed with the smaller
block size being 50% larger than when compressed with the larger block size. For the snappy case, a producer with 5000
partitions will require an additional 315 MB of JVM heap.
- Similarly, when compressing data with gzip, the producer and broker will use 8 KB instead of 1 KB as the buffer size. The default
for gzip is excessively low (512 bytes).
- The broker configuration
max.message.bytes
now applies to the total size of a batch of messages.
Previously the setting applied to batches of compressed messages, or to non-compressed messages individually.
A message batch may consist of only a single message, so in most cases, the limitation on the size of
individual messages is only reduced by the overhead of the batch format. However, there are some subtle implications
for message format conversion (see below for more detail). Note also
that while previously the broker would ensure that at least one message is returned in each fetch request (regardless of the
total and partition-level fetch sizes), the same behavior now applies to one message batch.
- GC log rotation is enabled by default, see KAFKA-3754 for details.
- Deprecated constructors of RecordMetadata, MetricName and Cluster classes have been removed.
- Added user headers support through a new Headers interface providing user headers read and write access.
- ProducerRecord and ConsumerRecord expose the new Headers API via
Headers headers()
method call.
- ExtendedSerializer and ExtendedDeserializer interfaces are introduced to support serialization and deserialization for headers. Headers will be ignored if the configured serializer and deserializer are not the above classes.
- A new config,
group.initial.rebalance.delay.ms
, was introduced.
This config specifies the time, in milliseconds, that the GroupCoordinator
will delay the initial consumer rebalance.
The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms
as new members join the group, up to a maximum of max.poll.interval.ms
.
The default value for this is 3 seconds.
During development and testing it might be desirable to set this to 0 in order to not delay test execution time.
org.apache.kafka.common.Cluster#partitionsForTopic
, partitionsForNode
and availablePartitionsForTopic
methods
will return an empty list instead of null
(which is considered a bad practice) in case the metadata for the required topic does not exist.
- Streams API configuration parameters
timestamp.extractor
, key.serde
, and value.serde
were deprecated and
replaced by default.timestamp.extractor
, default.key.serde
, and default.value.serde
, respectively.
- For offset commit failures in the Java consumer's
commitAsync
APIs, we no longer expose the underlying
cause when instances of RetriableCommitFailedException
are passed to the commit callback. See
KAFKA-5052 for more detail.
- KIP-107: FetchRequest v5 introduces a partition-level
log_start_offset
field.
- KIP-107: FetchResponse v5 introduces a partition-level
log_start_offset
field.
- KIP-82: ProduceRequest v3 introduces an array of
header
in the message protocol, containing key
field and value
field.
- KIP-82: FetchResponse v5 introduces an array of
header
in the message protocol, containing key
field and value
field.
Kafka 0.11.0 includes support for idempotent and transactional capabilities in the producer. Idempotent delivery
ensures that messages are delivered exactly once to a particular topic partition during the lifetime of a single producer.
Transactional delivery allows producers to send data to multiple partitions such that either all messages are successfully
delivered, or none of them are. Together, these capabilities enable "exactly once semantics" in Kafka. More details on these
features are available in the user guide, but below we add a few specific notes on enabling them in an upgraded cluster.
Note that enabling EoS is not required and there is no impact on the broker's behavior if unused.
- Only the new Java producer and consumer support exactly once semantics.
- These features depend crucially on the 0.11.0 message format. Attempting to use them
on an older format will result in unsupported version errors.
- Transaction state is stored in a new internal topic
__transaction_state
. This topic is not created until the
the first attempt to use a transactional request API. Similar to the consumer offsets topic, there are several settings
to control the topic's configuration. For example, transaction.state.log.min.isr
controls the minimum ISR for
this topic. See the configuration section in the user guide for a full list of options.
- For secure clusters, the transactional APIs require new ACLs which can be turned on with the
bin/kafka-acls.sh
.
tool.
- EoS in Kafka introduces new request APIs and modifies several existing ones. See
KIP-98
for the full details
The 0.11.0 message format includes several major enhancements in order to support better delivery semantics for the producer
(see KIP-98)
and improved replication fault tolerance
(see KIP-101).
Although the new format contains more information to make these improvements possible, we have made the batch format much
more efficient. As long as the number of messages per batch is more than 2, you can expect lower overall overhead. For smaller
batches, however, there may be a small performance impact. See here for the results of our
initial performance analysis of the new message format. You can also find more detail on the message format in the
KIP-98 proposal.
One of the notable differences in the new message format is that even uncompressed messages are stored together as a single batch.
This has a few implications for the broker configuration max.message.bytes
, which limits the size of a single batch. First,
if an older client produces messages to a topic partition using the old format, and the messages are individually smaller than
max.message.bytes
, the broker may still reject them after they are merged into a single batch during the up-conversion process.
Generally this can happen when the aggregate size of the individual messages is larger than max.message.bytes
. There is a similar
effect for older consumers reading messages down-converted from the new format: if the fetch size is not set at least as large as
max.message.bytes
, the consumer may not be able to make progress even if the individual uncompressed messages are smaller
than the configured fetch size. This behavior does not impact the Java client for 0.10.1.0 and later since it uses an updated fetch protocol
which ensures that at least one message can be returned even if it exceeds the fetch size. To get around these problems, you should ensure
1) that the producer's batch size is not set larger than max.message.bytes
, and 2) that the consumer's fetch size is set at
least as large as max.message.bytes
.
Most of the discussion on the performance impact of upgrading to the 0.10.0 message format
remains pertinent to the 0.11.0 upgrade. This mainly affects clusters that are not secured with TLS since "zero-copy" transfer
is already not possible in that case. In order to avoid the cost of down-conversion, you should ensure that consumer applications
are upgraded to the latest 0.11.0 client. Significantly, since the old consumer has been deprecated in 0.11.0.0, it does not support
the new message format. You must upgrade to use the new consumer to use the new message format without the cost of down-conversion.
Note that 0.11.0 consumers support backwards compatibility with 0.10.0 brokers and upward, so it is possible to upgrade the
clients first before the brokers.
0.10.2.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade.
However, please review the notable changes in 0.10.2.0 before upgrading.
Starting with version 0.10.2, Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.10.2
clients can talk to version 0.10.0 or newer brokers. However, if your brokers are older than 0.10.0, you must upgrade all the brokers in the
Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 0.8.x and newer clients.
For a rolling upgrade:
- Update server.properties file on all brokers and add the following properties:
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
- Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.2.
- If your previous message format is 0.10.0, change log.message.format.version to 0.10.2 (this is a no-op as the message format is the same for 0.10.0, 0.10.1 and 0.10.2).
If your previous message format version is lower than 0.10.0, do not change log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 or later.
- Restart the brokers one by one for the new protocol version to take effect.
- If log.message.format.version is still lower than 0.10.0 at this point, wait until all consumers have been upgraded to 0.10.0 or later,
then change log.message.format.version to 0.10.2 on each broker and restart them one by one.
Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
- Upgrading your Streams application from 0.10.1 to 0.10.2 does not require a broker upgrade.
A Kafka Streams 0.10.2 application can connect to 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
- You need to recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application.
- If you use a custom (i.e., user implemented) timestamp extractor, you will need to update this code, because the
TimestampExtractor
interface was changed.
- If you register custom metrics, you will need to update this code, because the
StreamsMetric
interface was changed.
- See Streams API changes in 0.10.2 for more details.
- Upgrading your Streams application from 0.10.0 to 0.10.2 does require a broker upgrade because a Kafka Streams 0.10.2 application can only connect to 0.10.2 or 0.10.1 brokers.
- There are couple of API changes, that are not backward compatible (cf. Streams API changes in 0.10.2 for more details).
Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application.
- Upgrading from 0.10.0.x to 0.10.2.2 requires two rolling bounces with config
upgrade.from="0.10.0"
set for first upgrade phase
(cf. KIP-268).
As an alternative, an offline upgrade is also possible.
- prepare your application instances for a rolling bounce and make sure that config
upgrade.from
is set to "0.10.0"
for new version 0.10.2.2
- bounce each instance of your application once
- prepare your newly deployed 0.10.2.2 application instances for a second round of rolling bounces; make sure to remove the value for config
upgrade.from
- bounce each instance of your application once more to complete the upgrade
- Upgrading from 0.10.0.x to 0.10.2.0 or 0.10.2.1 requires an offline upgrade (rolling bounce upgrade is not supported)
- stop all old (0.10.0.x) application instances
- update your code and swap old code and jar file with new code and new jar file
- restart all new (0.10.2.0 or 0.10.2.1) application instances
- New configuration parameter
upgrade.from
added that allows rolling bounce upgrade from version 0.10.0.x
- The default values for two configurations of the StreamsConfig class were changed to improve the resiliency of Kafka Streams applications. The internal Kafka Streams producer
retries
default value was changed from 0 to 10. The internal Kafka Streams consumer max.poll.interval.ms
default value was changed from 300000 to Integer.MAX_VALUE
.
- The Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.10.2 clients
can talk to version 0.10.0 or newer brokers. Note that some features are not available or are limited when older brokers
are used.
- Several methods on the Java consumer may now throw
InterruptException
if the calling thread is interrupted.
Please refer to the KafkaConsumer
Javadoc for a more in-depth explanation of this change.
- Java consumer now shuts down gracefully. By default, the consumer waits up to 30 seconds to complete pending requests.
A new close API with timeout has been added to
KafkaConsumer
to control the maximum wait time.
- Multiple regular expressions separated by commas can be passed to MirrorMaker with the new Java consumer via the --whitelist option. This
makes the behaviour consistent with MirrorMaker when used the old Scala consumer.
- Upgrading your Streams application from 0.10.1 to 0.10.2 does not require a broker upgrade.
A Kafka Streams 0.10.2 application can connect to 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
- The Zookeeper dependency was removed from the Streams API. The Streams API now uses the Kafka protocol to manage internal topics instead of
modifying Zookeeper directly. This eliminates the need for privileges to access Zookeeper directly and "StreamsConfig.ZOOKEEPER_CONFIG"
should not be set in the Streams app any more. If the Kafka cluster is secured, Streams apps must have the required security privileges to create new topics.
- Several new fields including "security.protocol", "connections.max.idle.ms", "retry.backoff.ms", "reconnect.backoff.ms" and "request.timeout.ms" were added to
StreamsConfig class. User should pay attention to the default values and set these if needed. For more details please refer to 3.5 Kafka Streams Configs.
- KIP-88: OffsetFetchRequest v2 supports retrieval of offsets for all topics if the
topics
array is set to null
.
- KIP-88: OffsetFetchResponse v2 introduces a top-level
error_code
field.
- KIP-103: UpdateMetadataRequest v3 introduces a
listener_name
field to the elements of the end_points
array.
- KIP-108: CreateTopicsRequest v1 introduces a
validate_only
field.
- KIP-108: CreateTopicsResponse v1 introduces an
error_message
field to the elements of the topic_errors
array.
0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade.
However, please notice the Potential breaking changes in 0.10.1.0 before upgrade.
Note: Because new protocols are introduced, it is important to upgrade your Kafka clusters before upgrading your clients (i.e. 0.10.1.x clients
only support 0.10.1.x or later brokers while 0.10.1.x brokers also support older clients).
For a rolling upgrade:
- Update server.properties file on all brokers and add the following properties:
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
- Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.1.0.
- If your previous message format is 0.10.0, change log.message.format.version to 0.10.1 (this is a no-op as the message format is the same for both 0.10.0 and 0.10.1).
If your previous message format version is lower than 0.10.0, do not change log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 or later.
- Restart the brokers one by one for the new protocol version to take effect.
- If log.message.format.version is still lower than 0.10.0 at this point, wait until all consumers have been upgraded to 0.10.0 or later,
then change log.message.format.version to 0.10.1 on each broker and restart them one by one.
Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
35/upgrade.html [468:1983]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
for details. A bug prevented the producer idempotence default from being applied which meant that it remained disabled unless the user had
explicitly set enable.idempotence
to true. See KAFKA-13598 for
more details. This issue was fixed and the default is properly applied.
- The producer has stronger delivery guarantees by default:
idempotence
is enabled and acks
is set to all
instead of 1
.
See KIP-679 for details.
In 3.0.0 and 3.1.0, a bug prevented the idempotence default from being applied which meant that it remained disabled unless the user had explicitly set
enable.idempotence
to true. Note that the bug did not affect the acks=all
change. See KAFKA-13598 for more details.
This issue was fixed and the default is properly applied in 3.0.1, 3.1.1, and 3.2.0.
- Java 8 and Scala 2.12 support have been deprecated since Apache Kafka 3.0 and will be removed in Apache Kafka 4.0.
See KIP-750
and KIP-751 for more details.
- ZooKeeper has been upgraded to version 3.6.3.
- A preview of KRaft mode is available, though upgrading to it from the 2.8 Early Access release is not possible. See
the
config/kraft/README.md
file for details.
- The release tarball no longer includes test, sources, javadoc and test sources jars. These are still published to the Maven Central repository.
- A number of implementation dependency jars are now available in the runtime classpath
instead of compile and runtime classpaths. Compilation errors after the upgrade can be fixed by adding the missing dependency jar(s) explicitly
or updating the application not to use internal classes.
- The default value for the consumer configuration
session.timeout.ms
was increased from 10s to 45s. See
KIP-735 for more details.
- The broker configuration
log.message.format.version
and topic configuration message.format.version
have been deprecated.
The value of both configurations is always assumed to be 3.0
if inter.broker.protocol.version
is 3.0
or higher.
If log.message.format.version
or message.format.version
are set, we recommend clearing them at the same time as the
inter.broker.protocol.version
upgrade to 3.0. This will avoid potential compatibility issues if the inter.broker.protocol.version
is downgraded. See KIP-724 for more details.
- The Streams API removed all deprecated APIs that were deprecated in version 2.5.0 or earlier.
For a complete list of removed APIs compare the detailed Kafka Streams upgrade notes.
- Kafka Streams no longer has a compile time dependency on "connect:json" module (KAFKA-5146).
Projects that were relying on this transitive dependency will have to explicitly declare it.
- Custom principal builder implementations specified through
principal.builder.class
must now implement the
KafkaPrincipalSerde
interface to allow for forwarding between brokers. See KIP-590 for more details about the usage of KafkaPrincipalSerde.
- A number of deprecated classes, methods and tools have been removed from the
clients
, connect
, core
and tools
modules:
- The Scala
Authorizer
, SimpleAclAuthorizer
and related classes have been removed. Please use the Java Authorizer
and AclAuthorizer
instead.
- The
Metric#value()
method was removed (KAFKA-12573).
- The
Sum
and Total
classes were removed (KAFKA-12584).
Please use WindowedSum
and CumulativeSum
instead.
- The
Count
and SampledTotal
classes were removed. Please use WindowedCount
and WindowedSum
respectively instead.
- The
PrincipalBuilder
, DefaultPrincipalBuilder
and ResourceFilter
classes were removed.
- Various constants and constructors were removed from
SslConfigs
, SaslConfigs
, AclBinding
and
AclBindingFilter
.
- The
Admin.electedPreferredLeaders()
methods were removed. Please use Admin.electLeaders
instead.
- The
kafka-preferred-replica-election
command line tool was removed. Please use kafka-leader-election
instead.
- The
--zookeeper
option was removed from the kafka-topics
and kafka-reassign-partitions
command line tools.
Please use --bootstrap-server
instead.
- In the
kafka-configs
command line tool, the --zookeeper
option is only supported for updating SCRAM Credentials configuration
and describing/updating dynamic broker configs when brokers are not running. Please use --bootstrap-server
for other configuration operations.
- The
ConfigEntry
constructor was removed (KAFKA-12577).
Please use the remaining public constructor instead.
- The config value
default
for the client config client.dns.lookup
has been removed. In the unlikely
event that you set this config explicitly, we recommend leaving the config unset (use_all_dns_ips
is used by default).
- The
ExtendedDeserializer
and ExtendedSerializer
classes have been removed. Please use Deserializer
and Serializer
instead.
- The
close(long, TimeUnit)
method was removed from the producer, consumer and admin client. Please use
close(Duration)
.
- The
ConsumerConfig.addDeserializerToConfig
and ProducerConfig.addSerializerToConfig
methods
were removed. These methods were not intended to be public API and there is no replacement.
- The
NoOffsetForPartitionException.partition()
method was removed. Please use partitions()
instead.
- The default
partition.assignment.strategy
is changed to "[RangeAssignor, CooperativeStickyAssignor]",
which will use the RangeAssignor by default, but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list.
Please check the client upgrade path guide here for more detail.
- The Scala
kafka.common.MessageFormatter
was removed. Please use the Java org.apache.kafka.common.MessageFormatter
.
- The
MessageFormatter.init(Properties)
method was removed. Please use configure(Map)
instead.
- The
checksum()
method has been removed from ConsumerRecord
and RecordMetadata
. The message
format v2, which has been the default since 0.11, moved the checksum from the record to the record batch. As such, these methods
don't make sense and no replacements exist.
- The
ChecksumMessageFormatter
class was removed. It is not part of the public API, but it may have been used
with kafka-console-consumer.sh
. It reported the checksum of each record, which has not been supported
since message format v2.
- The
org.apache.kafka.clients.consumer.internals.PartitionAssignor
class has been removed. Please use
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
instead.
- The
quota.producer.default
and quota.consumer.default
configurations were removed (KAFKA-12591).
Dynamic quota defaults must be used instead.
- The
port
and host.name
configurations were removed. Please use listeners
instead.
- The
advertised.port
and advertised.host.name
configurations were removed. Please use advertised.listeners
instead.
- The deprecated worker configurations
rest.host.name
and rest.port
were removed (KAFKA-12482) from the Kafka Connect worker configuration.
Please use listeners
instead.
- The
Producer#sendOffsetsToTransaction(Map offsets, String consumerGroupId)
method has been deprecated. Please use
Producer#sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata metadata)
instead, where the ConsumerGroupMetadata
can be retrieved via KafkaConsumer#groupMetadata()
for stronger semantics. Note that the full set of consumer group metadata is only
understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass
in new ConsumerGroupMetadata(consumerGroupId)
to work with older brokers. See KIP-732 for more details.
-
The Connect
internal.key.converter
and internal.value.converter
properties have been completely removed.
The use of these Connect worker properties has been deprecated since version 2.0.0.
Workers are now hardcoded to use the JSON converter with schemas.enable
set to false
. If your cluster has been using
a different internal key or value converter, you can follow the migration steps outlined in KIP-738
to safely upgrade your Connect cluster to 3.0.
- The Connect-based MirrorMaker (MM2) includes changes to support
IdentityReplicationPolicy
, enabling replication without renaming topics.
The existing DefaultReplicationPolicy
is still used by default, but identity replication can be enabled via the
replication.policy
configuration property. This is especially useful for users migrating from the older MirrorMaker (MM1), or for
use-cases with simple one-way replication topologies where topic renaming is undesirable. Note that IdentityReplicationPolicy
, unlike
DefaultReplicationPolicy
, cannot prevent replication cycles based on topic names, so take care to avoid cycles when constructing your
replication topology.
- The original MirrorMaker (MM1) and related classes have been deprecated. Please use the Connect-based
MirrorMaker (MM2), as described in the
Geo-Replication section.
If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
If you are upgrading from version 0.11.0.x or above, and you have not overridden the message format, then you only need to override
the inter-broker protocol version.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g.
2.7
, 2.6
, etc.)
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
It is still possible to downgrade at this point if there are any problems.
- Once the cluster's behavior and performance has been verified, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.8
.
- Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest
protocol version, it will no longer be possible to downgrade the cluster to an older version.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.8 on each broker and restart them one by one. Note that the older Scala clients,
which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs
(or to take advantage of exactly once semantics),
the newer Java clients must be used.
-
The 2.8.0 release added a new method to the Authorizer Interface introduced in
KIP-679.
The motivation is to unblock our future plan to enable the strongest message delivery guarantee by default.
Custom authorizer should consider providing a more efficient implementation that supports audit logging and any custom configs or access rules.
-
IBP 2.8 introduces topic IDs to topics as a part of
KIP-516.
When using ZooKeeper, this information is stored in the TopicZNode. If the cluster is downgraded to a previous IBP or version,
future topics will not get topic IDs and it is not guaranteed that topics will retain their topic IDs in ZooKeeper.
This means that upon upgrading again, some topics or all topics will be assigned new IDs.
- Kafka Streams introduce a type-safe
split()
operator as a substitution for deprecated KStream#branch()
method
(cf. KIP-418).
If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
If you are upgrading from version 0.11.0.x or above, and you have not overridden the message format, then you only need to override
the inter-broker protocol version.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g.
2.6
, 2.5
, etc.)
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
It is still possible to downgrade at this point if there are any problems.
- Once the cluster's behavior and performance has been verified, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.7
.
- Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest
protocol version, it will no longer be possible to downgrade the cluster to an older version.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.7 on each broker and restart them one by one. Note that the older Scala clients,
which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs
(or to take advantage of exactly once semantics),
the newer Java clients must be used.
-
The 2.7.0 release includes the core Raft implementation specified in
KIP-595.
There is a separate "raft" module containing most of the logic. Until integration with the
controller is complete, there is a standalone server that users can use for testing the
performance of the Raft implementation. See the README.md in the raft module for details
-
KIP-651 adds support
for using PEM files for key and trust stores.
-
KIP-612 adds support
for enforcing broker-wide and per-listener connection create rates. The 2.7.0 release contains
the first part of KIP-612 with dynamic configuration coming in the 2.8.0 release.
-
The ability to throttle topic and partition creations or
topics deletions to prevent a cluster from being harmed via
KIP-599
-
When new features become available in Kafka there are two main issues:
- How do Kafka clients become aware of broker capabilities?
- How does the broker decide which features to enable?
KIP-584
provides a flexible and operationally easy solution for client discovery, feature gating and rolling upgrades using a single restart.
-
The ability to print record offsets and headers with the
ConsoleConsumer
is now possible
via KIP-431
-
The addition of KIP-554
continues progress towards the goal of Zookeeper removal from Kafka. The addition of KIP-554
means you don't have to connect directly to ZooKeeper anymore for managing SCRAM credentials.
- Altering non-reconfigurable configs of existent listeners causes
InvalidRequestException
.
By contrast, the previous (unintended) behavior would have caused the updated configuration to be persisted,
but it wouldn't
take effect until the broker was restarted. See KAFKA-10479 for more discussion.
See DynamicBrokerConfig.DynamicSecurityConfigs
and SocketServer.ListenerReconfigurableConfigs
for the supported reconfigurable configs of existent listeners.
-
Kafka Streams adds support for
Sliding Windows Aggregations
in the KStreams DSL.
-
Reverse iteration over state stores enabling more efficient most recent update searches with
KIP-617
-
End-to-End latency metrics in Kafka Steams see
KIP-613
for more details
-
Kafka Streams added metrics reporting default RocksDB properties with
KIP-607
-
Better Scala implicit Serdes support from
KIP-616
If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
If you are upgrading from version 0.11.0.x or above, and you have not overridden the message format, then you only need to override
the inter-broker protocol version.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g.
2.5
, 2.4
, etc.)
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
It is still possible to downgrade at this point if there are any problems.
- Once the cluster's behavior and performance has been verified, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.6
.
- Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest
protocol version, it will no longer be possible to downgrade the cluster to an older version.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.6 on each broker and restart them one by one. Note that the older Scala clients,
which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs
(or to take advantage of exactly once semantics),
the newer Java clients must be used.
- Kafka Streams adds a new processing mode (requires broker 2.5 or newer) that improves application
scalability using exactly-once guarantees
(cf. KIP-447)
- TLSv1.3 has been enabled by default for Java 11 or newer. The client and server will negotiate TLSv1.3 if
both support it and fallback to TLSv1.2 otherwise. See
KIP-573 for more details.
- The default value for the
client.dns.lookup
configuration has been changed from default
to use_all_dns_ips
. If a hostname resolves to multiple IP addresses, clients and brokers will now
attempt to connect to each IP in sequence until the connection is successfully established. See
KIP-602
for more details.
NotLeaderForPartitionException
has been deprecated and replaced with NotLeaderOrFollowerException
.
Fetch requests and other requests intended only for the leader or follower return NOT_LEADER_OR_FOLLOWER(6) instead of REPLICA_NOT_AVAILABLE(9)
if the broker is not a replica, ensuring that this transient error during reassignments is handled by all clients as a retriable exception.
If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
If you are upgrading from version 0.11.0.x or above, and you have not overridden the message format, then you only need to override
the inter-broker protocol version.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g.
2.4
, 2.3
, etc.)
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
It is still possible to downgrade at this point if there are any problems.
- Once the cluster's behavior and performance has been verified, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.5
.
- Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest
protocol version, it will no longer be possible to downgrade the cluster to an older version.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.5 on each broker and restart them one by one. Note that the older Scala clients,
which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs
(or to take advantage of exactly once semantics),
the newer Java clients must be used.
- There are several notable changes to the reassignment tool
kafka-reassign-partitions.sh
following the completion of
KIP-455.
This tool now requires the --additional
flag to be provided when changing the throttle of an
active reassignment. Reassignment cancellation is now possible using the
--cancel
command. Finally, reassignment with --zookeeper
has been deprecated in favor of --bootstrap-server
. See the KIP for more detail.
- When
RebalanceProtocol#COOPERATIVE
is used, Consumer#poll
can still return data
while it is in the middle of a rebalance for those partitions still owned by the consumer; in addition
Consumer#commitSync
now may throw a non-fatal RebalanceInProgressException
to notify
users of such an event, in order to distinguish from the fatal CommitFailedException
and allow
users to complete the ongoing rebalance and then reattempt committing offsets for those still-owned partitions.
- For improved resiliency in typical network environments, the default value of
zookeeper.session.timeout.ms
has been increased from 6s to 18s and
replica.lag.time.max.ms
from 10s to 30s.
- New DSL operator
cogroup()
has been added for aggregating multiple streams together at once.
- Added a new
KStream.toTable()
API to translate an input event stream into a KTable.
- Added a new Serde type
Void
to represent null keys or null values from input topic.
- Deprecated
UsePreviousTimeOnInvalidTimestamp
and replaced it with UsePartitionTimeOnInvalidTimeStamp
.
- Improved exactly-once semantics by adding a pending offset fencing mechanism and stronger transactional commit
consistency check, which greatly simplifies the implementation of a scalable exactly-once application.
We also added a new exactly-once semantics code example under
examples folder. Check out
KIP-447
for the full details.
- Added a new public api
KafkaStreams.queryMetadataForKey(String, K, Serializer) to get detailed information on the key being queried.
It provides information about the partition number where the key resides in addition to hosts containing the active and standby partitions for the key.
- Provided support to query stale stores (for high availability) and the stores belonging to a specific partition by deprecating
KafkaStreams.store(String, QueryableStoreType)
and replacing it with KafkaStreams.store(StoreQueryParameters)
.
- Added a new public api to access lag information for stores local to an instance with
KafkaStreams.allLocalStorePartitionLags()
.
- Scala 2.11 is no longer supported. See
KIP-531
for details.
- All Scala classes from the package
kafka.security.auth
have been deprecated. See
KIP-504
for details of the new Java authorizer API added in 2.4.0. Note that kafka.security.auth.Authorizer
and kafka.security.auth.SimpleAclAuthorizer
were deprecated in 2.4.0.
- TLSv1 and TLSv1.1 have been disabled by default since these have known security vulnerabilities. Only TLSv1.2 is now
enabled by default. You can continue to use TLSv1 and TLSv1.1 by explicitly enabling these in the configuration options
ssl.protocol
and ssl.enabled.protocols
.
- ZooKeeper has been upgraded to 3.5.7, and a ZooKeeper upgrade from 3.4.X to 3.5.7 can fail if there are no snapshot files in the 3.4 data directory.
This usually happens in test upgrades where ZooKeeper 3.5.7 is trying to load an existing 3.4 data dir in which no snapshot file has been created.
For more details about the issue please refer to ZOOKEEPER-3056.
A fix is given in ZOOKEEPER-3056, which is to set
snapshot.trust.empty=true
config in zookeeper.properties
before the upgrade.
- ZooKeeper version 3.5.7 supports TLS-encrypted connectivity to ZooKeeper both with or without client certificates,
and additional Kafka configurations are available to take advantage of this.
See KIP-515 for details.
If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.10.0, 0.11.0, 1.0, 2.0, 2.2).
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact
following the upgrade for the details on what this configuration does.)
If you are upgrading from version 0.11.0.x or above, and you have not overridden the message format, then you only need to override
the inter-broker protocol version.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2, 2.3).
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
It is still possible to downgrade at this point if there are any problems.
- Once the cluster's behavior and performance has been verified, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.4.
- Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest
protocol version, it will no longer be possible to downgrade the cluster to an older version.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.4 on each broker and restart them one by one. Note that the older Scala clients,
which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs
(or to take advantage of exactly once semantics),
the newer Java clients must be used.
Additional Upgrade Notes:
- ZooKeeper has been upgraded to 3.5.6. ZooKeeper upgrade from 3.4.X to 3.5.6 can fail if there are no snapshot files in 3.4 data directory.
This usually happens in test upgrades where ZooKeeper 3.5.6 is trying to load an existing 3.4 data dir in which no snapshot file has been created.
For more details about the issue please refer to ZOOKEEPER-3056.
A fix is given in ZOOKEEPER-3056, which is to set
snapshot.trust.empty=true
config in zookeeper.properties
before the upgrade. But we have observed data loss in standalone cluster upgrades when using
snapshot.trust.empty=true
config. For more details about the issue please refer to ZOOKEEPER-3644.
So we recommend the safe workaround of copying empty snapshot file to the 3.4 data directory,
if there are no snapshot files in 3.4 data directory. For more details about the workaround please refer to ZooKeeper Upgrade FAQ.
-
An embedded Jetty based AdminServer added in ZooKeeper 3.5.
AdminServer is enabled by default in ZooKeeper and is started on port 8080.
AdminServer is disabled by default in the ZooKeeper config (
zookeeper.properties
) provided by the Apache Kafka distribution.
Make sure to update your local zookeeper.properties
file with admin.enableServer=false
if you wish to disable the AdminServer.
Please refer AdminServer config to configure the AdminServer.
- A new Admin API has been added for partition reassignments. Due to changing the way Kafka propagates reassignment information,
it is possible to lose reassignment state in failure edge cases while upgrading to the new version. It is not recommended to start reassignments while upgrading.
- ZooKeeper has been upgraded from 3.4.14 to 3.5.6. TLS and dynamic reconfiguration are supported by the new version.
- The
bin/kafka-preferred-replica-election.sh
command line tool has been deprecated. It has been replaced by bin/kafka-leader-election.sh
.
- The methods
electPreferredLeaders
in the Java AdminClient
class have been deprecated in favor of the methods electLeaders
.
- Scala code leveraging the
NewTopic(String, int, short)
constructor with literal values will need to explicitly call toShort
on the second literal.
- The argument in the constructor
GroupAuthorizationException(String)
is now used to specify an exception message.
Previously it referred to the group that failed authorization. This was done for consistency with other exception types and to
avoid potential misuse. The constructor TopicAuthorizationException(String)
which was previously used for a single
unauthorized topic was changed similarly.
- The internal
PartitionAssignor
interface has been deprecated and replaced with a new ConsumerPartitionAssignor
in the public API. Some
methods/signatures are slightly different between the two interfaces. Users implementing a custom PartitionAssignor should migrate to the new interface as soon as possible.
- The
DefaultPartitioner
now uses a sticky partitioning strategy. This means that records for specific topic with null keys and no assigned partition
will be sent to the same partition until the batch is ready to be sent. When a new batch is created, a new partition is chosen. This decreases latency to produce, but
it may result in uneven distribution of records across partitions in edge cases. Generally users will not be impacted, but this difference may be noticeable in tests and
other situations producing records for a very short amount of time.
- The blocking
KafkaConsumer#committed
methods have been extended to allow a list of partitions as input parameters rather than a single partition.
It enables fewer request/response iterations between clients and brokers fetching for the committed offsets for the consumer group.
The old overloaded functions are deprecated and we would recommend users to make their code changes to leverage the new methods (details
can be found in KIP-520).
- We've introduced a new
INVALID_RECORD
error in the produce response to distinguish from the CORRUPT_MESSAGE
error.
To be more concrete, previously when a batch of records was sent as part of a single request to the broker and one or more of the records failed
the validation due to various causes (mismatch magic bytes, crc checksum errors, null key for log compacted topics, etc), the whole batch would be rejected
with the same and misleading CORRUPT_MESSAGE
, and the caller of the producer client would see the corresponding exception from either
the future object of RecordMetadata
returned from the send
call as well as in the Callback#onCompletion(RecordMetadata metadata, Exception exception)
Now with the new error code and improved error messages of the exception, producer callers would be better informed about the root cause why their sent records were failed.
- We are introducing incremental cooperative rebalancing to the clients' group protocol, which allows consumers to keep all of their assigned partitions during a rebalance
and at the end revoke only those which must be migrated to another consumer for overall cluster balance. The
ConsumerCoordinator
will choose the latest RebalanceProtocol
that is commonly supported by all of the consumer's supported assignors. You can use the new built-in CooperativeStickyAssignor
or plug in your own custom cooperative assignor. To do
so you must implement the ConsumerPartitionAssignor
interface and include RebalanceProtocol.COOPERATIVE
in the list returned by ConsumerPartitionAssignor#supportedProtocols
.
Your custom assignor can then leverage the ownedPartitions
field in each consumer's Subscription
to give partitions back to their previous owners whenever possible. Note that when
a partition is to be reassigned to another consumer, it must be removed from the new assignment until it has been revoked from its original owner. Any consumer that has to revoke a partition will trigger
a followup rebalance to allow the revoked partition to safely be assigned to its new owner. See the
ConsumerPartitionAssignor RebalanceProtocol javadocs for more information.
To upgrade from the old (eager) protocol, which always revokes all partitions before rebalancing, to cooperative rebalancing, you must follow a specific upgrade path to get all clients on the same ConsumerPartitionAssignor
that supports the cooperative protocol. This can be done with two rolling bounces, using the CooperativeStickyAssignor
for the example: during the first one, add "cooperative-sticky" to the list of supported assignors
for each member (without removing the previous assignor -- note that if previously using the default, you must include that explicitly as well). You then bounce and/or upgrade it.
Once the entire group is on 2.4+ and all members have the "cooperative-sticky" among their supported assignors, remove the other assignor(s) and perform a second rolling bounce so that by the end all members support only the
cooperative protocol. For further details on the cooperative rebalancing protocol and upgrade path, see KIP-429.
- There are some behavioral changes to the
ConsumerRebalanceListener
, as well as a new API. Exceptions thrown during any of the listener's three callbacks will no longer be swallowed, and will instead be re-thrown
all the way up to the Consumer.poll()
call. The onPartitionsLost
method has been added to allow users to react to abnormal circumstances where a consumer may have lost ownership of its partitions
(such as a missed rebalance) and cannot commit offsets. By default, this will simply call the existing onPartitionsRevoked
API to align with previous behavior. Note however that onPartitionsLost
will not
be called when the set of lost partitions is empty. This means that no callback will be invoked at the beginning of the first rebalance of a new consumer joining the group.
The semantics of the ConsumerRebalanceListener's
callbacks are further changed when following the cooperative rebalancing protocol described above. In addition to onPartitionsLost
, onPartitionsRevoked
will also never be called when the set of revoked partitions is empty. The callback will generally be invoked only at the end of a rebalance, and only on the set of partitions that are being moved to another consumer. The
onPartitionsAssigned
callback will however always be called, even with an empty set of partitions, as a way to notify users of a rebalance event (this is true for both cooperative and eager). For details on
the new callback semantics, see the ConsumerRebalanceListener javadocs.
- The Scala trait
kafka.security.auth.Authorizer
has been deprecated and replaced with a new Java API
org.apache.kafka.server.authorizer.Authorizer
. The authorizer implementation class
kafka.security.auth.SimpleAclAuthorizer
has also been deprecated and replaced with a new
implementation kafka.security.authorizer.AclAuthorizer
. AclAuthorizer
uses features
supported by the new API to improve authorization logging and is compatible with SimpleAclAuthorizer
.
For more details, see KIP-504.
If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1).
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact
following the upgrade for the details on what this configuration does.)
If you are upgrading from 0.11.0.x, 1.0.x, 1.1.x, 2.0.x, or 2.1.x, and you have not overridden the message format, then you only need to override
the inter-broker protocol version.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 2.0, 2.1, 2.2).
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
It is still possible to downgrade at this point if there are any problems.
- Once the cluster's behavior and performance has been verified, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.3.
- Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest
protocol version, it will no longer be possible to downgrade the cluster to an older version.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.3 on each broker and restart them one by one. Note that the older Scala clients,
which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs
(or to take advantage of exactly once semantics),
the newer Java clients must be used.
- We are introducing a new rebalancing protocol for Kafka Connect based on
incremental cooperative rebalancing.
The new protocol does not require stopping all the tasks during a rebalancing phase between Connect workers. Instead, only the tasks that need to be exchanged
between workers are stopped and they are started in a follow up rebalance. The new Connect protocol is enabled by default beginning with 2.3.0.
For more details on how it works and how to enable the old behavior of eager rebalancing, checkout
incremental cooperative rebalancing design.
- We are introducing static membership towards consumer user. This feature reduces unnecessary rebalances during normal application upgrades or rolling bounces.
For more details on how to use it, checkout static membership design.
- Kafka Streams DSL switches its used store types. While this change is mainly transparent to users, there are some corner cases that may require code changes.
See the Kafka Streams upgrade section for more details.
- Kafka Streams 2.3.0 requires 0.11 message format or higher and does not work with older message format.
If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1).
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact
following the upgrade for the details on what this configuration does.)
If you are upgrading from 0.11.0.x, 1.0.x, 1.1.x, or 2.0.x and you have not overridden the message format, then you only need to override
the inter-broker protocol version.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 2.0).
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
It is still possible to downgrade at this point if there are any problems.
- Once the cluster's behavior and performance has been verified, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.2.
- Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest
protocol version, it will no longer be possible to downgrade the cluster to an older version.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.2 on each broker and restart them one by one. Note that the older Scala clients,
which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs
(or to take advantage of exactly once semantics),
the newer Java clients must be used.
- Kafka Streams 2.2.1 requires 0.11 message format or higher and does not work with older message format.
- The default consumer group id has been changed from the empty string (
""
) to null
. Consumers who use the new default group id will not be able to subscribe to topics,
and fetch or commit offsets. The empty string as consumer group id is deprecated but will be supported until a future major release. Old clients that rely on the empty string group id will now
have to explicitly provide it as part of their consumer config. For more information see
KIP-289.
- The
bin/kafka-topics.sh
command line tool is now able to connect directly to brokers with --bootstrap-server
instead of zookeeper. The old --zookeeper
option is still available for now. Please read KIP-377 for more information.
- Kafka Streams depends on a newer version of RocksDBs that requires MacOS 10.13 or higher.
Note that 2.1.x contains a change to the internal schema used to store consumer offsets. Once the upgrade is
complete, it will not be possible to downgrade to previous versions. See the rolling upgrade notes below for more detail.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1).
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact
following the upgrade for the details on what this configuration does.)
If you are upgrading from 0.11.0.x, 1.0.x, 1.1.x, or 2.0.x and you have not overridden the message format, then you only need to override
the inter-broker protocol version.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 2.0).
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
It is still possible to downgrade at this point if there are any problems.
- Once the cluster's behavior and performance has been verified, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.1.
- Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest
protocol version, it will no longer be possible to downgrade the cluster to an older version.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.1 on each broker and restart them one by one. Note that the older Scala clients,
which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs
(or to take advantage of exactly once semantics),
the newer Java clients must be used.
Additional Upgrade Notes:
- Offset expiration semantics has slightly changed in this version. According to the new semantics, offsets of partitions in a group will
not be removed while the group is subscribed to the corresponding topic and is still active (has active consumers). If group becomes
empty all its offsets will be removed after default offset retention period (or the one set by broker) has passed (unless the group becomes
active again). Offsets associated with standalone (simple) consumers, that do not use Kafka group management, will be removed after default
offset retention period (or the one set by broker) has passed since their last commit.
- The default for console consumer's
enable.auto.commit
property when no group.id
is provided is now set to false
.
This is to avoid polluting the consumer coordinator cache as the auto-generated group is not likely to be used by other consumers.
- The default value for the producer's
retries
config was changed to Integer.MAX_VALUE
, as we introduced delivery.timeout.ms
in KIP-91,
which sets an upper bound on the total time between sending a record and receiving acknowledgement from the broker. By default,
the delivery timeout is set to 2 minutes.
- By default, MirrorMaker now overrides
delivery.timeout.ms
to Integer.MAX_VALUE
when
configuring the producer. If you have overridden the value of retries
in order to fail faster,
you will instead need to override delivery.timeout.ms
.
- The
ListGroup
API now expects, as a recommended alternative, Describe Group
access to the groups a user should be able to list.
Even though the old Describe Cluster
access is still supported for backward compatibility, using it for this API is not advised.
- KIP-336 deprecates the ExtendedSerializer and ExtendedDeserializer interfaces and
propagates the usage of Serializer and Deserializer. ExtendedSerializer and ExtendedDeserializer were introduced with
KIP-82 to provide record headers for serializers and deserializers
in a Java 7 compatible fashion. Now we consolidated these interfaces as Java 7 support has been dropped since.
- Jetty has been upgraded to 9.4.12, which excludes TLS_RSA_* ciphers by default because they do not support forward
secrecy, see https://github.com/eclipse/jetty.project/issues/2807 for more information.
- Unclean leader election is automatically enabled by the controller when
unclean.leader.election.enable
config is dynamically updated by using per-topic config override.
- The
AdminClient
has added a method AdminClient#metrics()
. Now any application using the AdminClient
can gain more information
and insight by viewing the metrics captured from the AdminClient
. For more information
see KIP-324
- Kafka now supports Zstandard compression from KIP-110.
You must upgrade the broker as well as clients to make use of it. Consumers prior to 2.1.0 will not be able to read from topics which use
Zstandard compression, so you should not enable it for a topic until all downstream consumers are upgraded. See the KIP for more detail.
Kafka 2.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below,
you guarantee no downtime during the upgrade. However, please review the notable changes in 2.0.0 before upgrading.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1).
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact
following the upgrade for the details on what this configuration does.)
If you are upgrading from 0.11.0.x, 1.0.x, or 1.1.x and you have not overridden the message format, then you only need to override
the inter-broker protocol format.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1).
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
- Once the entire cluster is upgraded, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 2.0.
- Restart the brokers one by one for the new protocol version to take effect.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.0 on each broker and restart them one by one. Note that the older Scala consumer
does not support the new message format introduced in 0.11, so to avoid the performance cost of down-conversion (or to
take advantage of exactly once semantics), the newer Java consumer must be used.
Additional Upgrade Notes:
- If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start
with the new protocol by default.
- Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after.
Similarly for the message format version.
- If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities.
Hot-swapping the jar-file only might not work.
- ACLs should not be added to prefixed resources,
(added in KIP-290),
until all brokers in the cluster have been updated.
NOTE: any prefixed ACLs added to a cluster, even after the cluster is fully upgraded, will be ignored should the cluster be downgraded again.
- KIP-186 increases the default offset retention time from 1 day to 7 days. This makes it less likely to "lose" offsets in an application that commits infrequently. It also increases the active set of offsets and therefore can increase memory usage on the broker. Note that the console consumer currently enables offset commit by default and can be the source of a large number of offsets which this change will now preserve for 7 days instead of 1. You can preserve the existing behavior by setting the broker config
offsets.retention.minutes
to 1440.
- Support for Java 7 has been dropped, Java 8 is now the minimum version required.
- The default value for
ssl.endpoint.identification.algorithm
was changed to https
, which performs hostname verification (man-in-the-middle attacks are possible otherwise). Set ssl.endpoint.identification.algorithm
to an empty string to restore the previous behaviour.
- KAFKA-5674 extends the lower interval of
max.connections.per.ip
minimum to zero and therefore allows IP-based filtering of inbound connections.
- KIP-272
added API version tag to the metric
kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...}
.
This metric now becomes kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...},version={0|1|2|3|...}
. This will impact
JMX monitoring tools that do not automatically aggregate. To get the total count for a specific request type, the tool needs to be
updated to aggregate across different versions.
- KIP-225 changed the metric "records.lag" to use tags for topic and partition. The original version with the name format "{topic}-{partition}.records-lag" has been removed.
- The Scala consumers, which have been deprecated since 0.11.0.0, have been removed. The Java consumer has been the recommended option
since 0.10.0.0. Note that the Scala consumers in 1.1.0 (and older) will continue to work even if the brokers are upgraded to 2.0.0.
- The Scala producers, which have been deprecated since 0.10.0.0, have been removed. The Java producer has been the recommended option
since 0.9.0.0. Note that the behaviour of the default partitioner in the Java producer differs from the default partitioner
in the Scala producers. Users migrating should consider configuring a custom partitioner that retains the previous behaviour.
Note that the Scala producers in 1.1.0 (and older) will continue to work even if the brokers are upgraded to 2.0.0.
- MirrorMaker and ConsoleConsumer no longer support the Scala consumer, they always use the Java consumer.
- The ConsoleProducer no longer supports the Scala producer, it always uses the Java producer.
- A number of deprecated tools that rely on the Scala clients have been removed: ReplayLogProducer, SimpleConsumerPerformance, SimpleConsumerShell, ExportZkOffsets, ImportZkOffsets, UpdateOffsetsInZK, VerifyConsumerRebalance.
- The deprecated kafka.tools.ProducerPerformance has been removed, please use org.apache.kafka.tools.ProducerPerformance.
- New Kafka Streams configuration parameter
upgrade.from
added that allows rolling bounce upgrade from older version.
- KIP-284 changed the retention time for Kafka Streams repartition topics by setting its default value to
Long.MAX_VALUE
.
- Updated
ProcessorStateManager
APIs in Kafka Streams for registering state stores to the processor topology. For more details please read the Streams Upgrade Guide.
-
In earlier releases, Connect's worker configuration required the
internal.key.converter
and internal.value.converter
properties.
In 2.0, these are no longer required and default to the JSON converter.
You may safely remove these properties from your Connect standalone and distributed worker configurations:
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
- KIP-266 adds a new consumer configuration
default.api.timeout.ms
to specify the default timeout to use for KafkaConsumer
APIs that could block. The KIP also adds overloads for such blocking
APIs to support specifying a specific timeout to use for each of them instead of using the default timeout set by default.api.timeout.ms
.
In particular, a new poll(Duration)
API has been added which does not block for dynamic partition assignment.
The old poll(long)
API has been deprecated and will be removed in a future version. Overloads have also been added
for other KafkaConsumer
methods like partitionsFor
, listTopics
, offsetsForTimes
,
beginningOffsets
, endOffsets
and close
that take in a Duration
.
- Also as part of KIP-266, the default value of
request.timeout.ms
has been changed to 30 seconds.
The previous value was a little higher than 5 minutes to account for maximum time that a rebalance would take.
Now we treat the JoinGroup request in the rebalance as a special case and use a value derived from
max.poll.interval.ms
for the request timeout. All other request types use the timeout defined
by request.timeout.ms
- The internal method
kafka.admin.AdminClient.deleteRecordsBefore
has been removed. Users are encouraged to migrate to org.apache.kafka.clients.admin.AdminClient.deleteRecords
.
- The AclCommand tool
--producer
convenience option uses the KIP-277 finer grained ACL on the given topic.
- KIP-176 removes
the
--new-consumer
option for all consumer based tools. This option is redundant since the new consumer is automatically
used if --bootstrap-server is defined.
- KIP-290 adds the ability
to define ACLs on prefixed resources, e.g. any topic starting with 'foo'.
- KIP-283 improves message down-conversion
handling on Kafka broker, which has typically been a memory-intensive operation. The KIP adds a mechanism by which the operation becomes less memory intensive
by down-converting chunks of partition data at a time which helps put an upper bound on memory consumption. With this improvement, there is a change in
FetchResponse
protocol behavior where the broker could send an oversized message batch towards the end of the response with an invalid offset.
Such oversized messages must be ignored by consumer clients, as is done by KafkaConsumer
.
KIP-283 also adds new topic and broker configurations message.downconversion.enable
and log.message.downconversion.enable
respectively
to control whether down-conversion is enabled. When disabled, broker does not perform any down-conversion and instead sends an UNSUPPORTED_VERSION
error to the client.
- Dynamic broker configuration options can be stored in ZooKeeper using kafka-configs.sh before brokers are started.
This option can be used to avoid storing clear passwords in server.properties as all password configs may be stored encrypted in ZooKeeper.
- ZooKeeper hosts are now re-resolved if connection attempt fails. But if your ZooKeeper host names resolve
to multiple addresses and some of them are not reachable, then you may need to increase the connection timeout
zookeeper.connection.timeout.ms
.
- KIP-279: OffsetsForLeaderEpochResponse v1 introduces a partition-level
leader_epoch
field.
- KIP-219: Bump up the protocol versions of non-cluster action requests and responses that are throttled on quota violation.
- KIP-290: Bump up the protocol versions ACL create, describe and delete requests and responses.
- Upgrading your Streams application from 1.1 to 2.0 does not require a broker upgrade.
A Kafka Streams 2.0 application can connect to 2.0, 1.1, 1.0, 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
- Note that in 2.0 we have removed the public APIs that are deprecated prior to 1.0; users leveraging on those deprecated APIs need to make code changes accordingly.
See Streams API changes in 2.0.0 for more details.
Kafka 1.1.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below,
you guarantee no downtime during the upgrade. However, please review the notable changes in 1.1.0 before upgrading.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0).
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact
following the upgrade for the details on what this configuration does.)
If you are upgrading from 0.11.0.x or 1.0.x and you have not overridden the message format, then you only need to override
the inter-broker protocol format.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0 or 1.0).
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
- Once the entire cluster is upgraded, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 1.1.
- Restart the brokers one by one for the new protocol version to take effect.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 1.1 on each broker and restart them one by one. Note that the older Scala consumer
does not support the new message format introduced in 0.11, so to avoid the performance cost of down-conversion (or to
take advantage of exactly once semantics), the newer Java consumer must be used.
Additional Upgrade Notes:
- If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start
with the new protocol by default.
- Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after.
Similarly for the message format version.
- If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguties.
Hot-swapping the jar-file only might not work.
- New Kafka Streams configuration parameter
upgrade.from
added that allows rolling bounce upgrade from version 0.10.0.x
- See the Kafka Streams upgrade guide for details about this new config.
- The kafka artifact in Maven no longer depends on log4j or slf4j-log4j12. Similarly to the kafka-clients artifact, users
can now choose the logging back-end by including the appropriate slf4j module (slf4j-log4j12, logback, etc.). The release
tarball still includes log4j and slf4j-log4j12.
- KIP-225 changed the metric "records.lag" to use tags for topic and partition. The original version with the name format "{topic}-{partition}.records-lag" is deprecated and will be removed in 2.0.0.
- Kafka Streams is more robust against broker communication errors. Instead of stopping the Kafka Streams client with a fatal exception,
Kafka Streams tries to self-heal and reconnect to the cluster. Using the new
AdminClient
you have better control of how often
Kafka Streams retries and can configure
fine-grained timeouts (instead of hard coded retries as in older version).
- Kafka Streams rebalance time was reduced further making Kafka Streams more responsive.
- Kafka Connect now supports message headers in both sink and source connectors, and to manipulate them via simple message transforms. Connectors must be changed to explicitly use them. A new
HeaderConverter
is introduced to control how headers are (de)serialized, and the new "SimpleHeaderConverter" is used by default to use string representations of values.
- kafka.tools.DumpLogSegments now automatically sets deep-iteration option if print-data-log is enabled
explicitly or implicitly due to any of the other options like decoder.
- KIP-226 introduced DescribeConfigs Request/Response v1.
- KIP-227 introduced Fetch Request/Response v7.
- Upgrading your Streams application from 1.0 to 1.1 does not require a broker upgrade.
A Kafka Streams 1.1 application can connect to 1.0, 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
- See Streams API changes in 1.1.0 for more details.
Kafka 1.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below,
you guarantee no downtime during the upgrade. However, please review the notable changes in 1.0.0 before upgrading.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0).
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact
following the upgrade for the details on what this configuration does.)
If you are upgrading from 0.11.0.x and you have not overridden the message format, you must set
both the message format version and the inter-broker protocol version to 0.11.0.
- inter.broker.protocol.version=0.11.0
- log.message.format.version=0.11.0
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
- Once the entire cluster is upgraded, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 1.0.
- Restart the brokers one by one for the new protocol version to take effect.
- If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 1.0 on each broker and restart them one by one. If you are upgrading from
0.11.0 and log.message.format.version is set to 0.11.0, you can update the config and skip the rolling restart.
Note that the older Scala consumer does not support the new message format introduced in 0.11, so to avoid the
performance cost of down-conversion (or to take advantage of exactly once semantics),
the newer Java consumer must be used.
Additional Upgrade Notes:
- If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start
with the new protocol by default.
- Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after.
Similarly for the message format version.
- New Kafka Streams configuration parameter
upgrade.from
added that allows rolling bounce upgrade from version 0.10.0.x
- See the Kafka Streams upgrade guide for details about this new config.
- Restored binary compatibility of AdminClient's Options classes (e.g. CreateTopicsOptions, DeleteTopicsOptions, etc.) with
0.11.0.x. Binary (but not source) compatibility had been broken inadvertently in 1.0.0.
- Topic deletion is now enabled by default, since the functionality is now stable. Users who wish to
to retain the previous behavior should set the broker config
delete.topic.enable
to false
. Keep in mind that topic deletion removes data and the operation is not reversible (i.e. there is no "undelete" operation)
- For topics that support timestamp search if no offset can be found for a partition, that partition is now included in the search result with a null offset value. Previously, the partition was not included in the map.
This change was made to make the search behavior consistent with the case of topics not supporting timestamp search.
- If the
inter.broker.protocol.version
is 1.0 or later, a broker will now stay online to serve replicas
on live log directories even if there are offline log directories. A log directory may become offline due to IOException
caused by hardware failure. Users need to monitor the per-broker metric offlineLogDirectoryCount
to check
whether there is offline log directory.
- Added KafkaStorageException which is a retriable exception. KafkaStorageException will be converted to NotLeaderForPartitionException in the response
if the version of the client's FetchRequest or ProducerRequest does not support KafkaStorageException.
- -XX:+DisableExplicitGC was replaced by -XX:+ExplicitGCInvokesConcurrent in the default JVM settings. This helps
avoid out of memory exceptions during allocation of native memory by direct buffers in some cases.
- The overridden
handleError
method implementations have been removed from the following deprecated classes in
the kafka.api
package: FetchRequest
, GroupCoordinatorRequest
, OffsetCommitRequest
,
OffsetFetchRequest
, OffsetRequest
, ProducerRequest
, and TopicMetadataRequest
.
This was only intended for use on the broker, but it is no longer in use and the implementations have not been maintained.
A stub implementation has been retained for binary compatibility.
- The Java clients and tools now accept any string as a client-id.
- The deprecated tool
kafka-consumer-offset-checker.sh
has been removed. Use kafka-consumer-groups.sh
to get consumer group details.
- SimpleAclAuthorizer now logs access denials to the authorizer log by default.
- Authentication failures are now reported to clients as one of the subclasses of
AuthenticationException
.
No retries will be performed if a client connection fails authentication.
- Custom
SaslServer
implementations may throw SaslAuthenticationException
to provide an error
message to return to clients indicating the reason for authentication failure. Implementors should take care not to include
any security-critical information in the exception message that should not be leaked to unauthenticated clients.
- The
app-info
mbean registered with JMX to provide version and commit id will be deprecated and replaced with
metrics providing these attributes.
- Kafka metrics may now contain non-numeric values.
org.apache.kafka.common.Metric#value()
has been deprecated and
will return 0.0
in such cases to minimise the probability of breaking users who read the value of every client
metric (via a MetricsReporter
implementation or by calling the metrics()
method).
org.apache.kafka.common.Metric#metricValue()
can be used to retrieve numeric and non-numeric metric values.
- Every Kafka rate metric now has a corresponding cumulative count metric with the suffix
-total
to simplify downstream processing. For example, records-consumed-rate
has a corresponding
metric named records-consumed-total
.
- Mx4j will only be enabled if the system property
kafka_mx4jenable
is set to true
. Due to a logic
inversion bug, it was previously enabled by default and disabled if kafka_mx4jenable
was set to true
.
- The package
org.apache.kafka.common.security.auth
in the clients jar has been made public and added to the javadocs.
Internal classes which had previously been located in this package have been moved elsewhere.
- When using an Authorizer and a user doesn't have required permissions on a topic, the broker
will return TOPIC_AUTHORIZATION_FAILED errors to requests irrespective of topic existence on broker.
If the user have required permissions and the topic doesn't exists, then the UNKNOWN_TOPIC_OR_PARTITION
error code will be returned.
- config/consumer.properties file updated to use new consumer config properties.
- KIP-112: LeaderAndIsrRequest v1 introduces a partition-level
is_new
field.
- KIP-112: UpdateMetadataRequest v4 introduces a partition-level
offline_replicas
field.
- KIP-112: MetadataResponse v5 introduces a partition-level
offline_replicas
field.
- KIP-112: ProduceResponse v4 introduces error code for KafkaStorageException.
- KIP-112: FetchResponse v6 introduces error code for KafkaStorageException.
- KIP-152:
SaslAuthenticate request has been added to enable reporting of authentication failures. This request will
be used if the SaslHandshake request version is greater than 0.
- Upgrading your Streams application from 0.11.0 to 1.0 does not require a broker upgrade.
A Kafka Streams 1.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
However, Kafka Streams 1.0 requires 0.10 message format or newer and does not work with older message formats.
- If you are monitoring on streams metrics, you will need make some changes to the metrics names in your reporting and monitoring code, because the metrics sensor hierarchy was changed.
- There are a few public APIs including
ProcessorContext#schedule()
, Processor#punctuate()
and KStreamBuilder
, TopologyBuilder
are being deprecated by new APIs.
We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
- See Streams API changes in 1.0.0 for more details.
- Upgrading your Streams application from 0.10.2 to 1.0 does not require a broker upgrade.
A Kafka Streams 1.0 application can connect to 1.0, 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
- If you are monitoring on streams metrics, you will need make some changes to the metrics names in your reporting and monitoring code, because the metrics sensor hierarchy was changed.
- There are a few public APIs including
ProcessorContext#schedule()
, Processor#punctuate()
and KStreamBuilder
, TopologyBuilder
are being deprecated by new APIs.
We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
- If you specify customized
key.serde
, value.serde
and timestamp.extractor
in configs, it is recommended to use their replaced configure parameter as these configs are deprecated.
- See Streams API changes in 0.11.0 for more details.
- Upgrading your Streams application from 0.10.1 to 1.0 does not require a broker upgrade.
A Kafka Streams 1.0 application can connect to 1.0, 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
- You need to recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application.
- If you are monitoring on streams metrics, you will need make some changes to the metrics names in your reporting and monitoring code, because the metrics sensor hierarchy was changed.
- There are a few public APIs including
ProcessorContext#schedule()
, Processor#punctuate()
and KStreamBuilder
, TopologyBuilder
are being deprecated by new APIs.
We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
- If you specify customized
key.serde
, value.serde
and timestamp.extractor
in configs, it is recommended to use their replaced configure parameter as these configs are deprecated.
- If you use a custom (i.e., user implemented) timestamp extractor, you will need to update this code, because the
TimestampExtractor
interface was changed.
- If you register custom metrics, you will need to update this code, because the
StreamsMetric
interface was changed.
- See Streams API changes in 1.0.0,
Streams API changes in 0.11.0 and
Streams API changes in 0.10.2 for more details.
- Upgrading your Streams application from 0.10.0 to 1.0 does require a broker upgrade because a Kafka Streams 1.0 application can only connect to 0.1, 0.11.0, 0.10.2, or 0.10.1 brokers.
- There are couple of API changes, that are not backward compatible (cf. Streams API changes in 1.0.0,
Streams API changes in 0.11.0,
Streams API changes in 0.10.2, and
Streams API changes in 0.10.1 for more details).
Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application.
- Upgrading from 0.10.0.x to 1.0.2 requires two rolling bounces with config
upgrade.from="0.10.0"
set for first upgrade phase
(cf. KIP-268).
As an alternative, an offline upgrade is also possible.
- prepare your application instances for a rolling bounce and make sure that config
upgrade.from
is set to "0.10.0"
for new version 0.11.0.3
- bounce each instance of your application once
- prepare your newly deployed 1.0.2 application instances for a second round of rolling bounces; make sure to remove the value for config
upgrade.from
- bounce each instance of your application once more to complete the upgrade
- Upgrading from 0.10.0.x to 1.0.0 or 1.0.1 requires an offline upgrade (rolling bounce upgrade is not supported)
- stop all old (0.10.0.x) application instances
- update your code and swap old code and jar file with new code and new jar file
- restart all new (1.0.0 or 1.0.1) application instances
Kafka 0.11.0.0 introduces a new message format version as well as wire protocol changes. By following the recommended rolling upgrade plan below,
you guarantee no downtime during the upgrade. However, please review the notable changes in 0.11.0.0 before upgrading.
Starting with version 0.10.2, Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.11.0
clients can talk to version 0.10.0 or newer brokers. However, if your brokers are older than 0.10.0, you must upgrade all the brokers in the
Kafka cluster before upgrading your clients. Version 0.11.0 brokers support 0.8.x and newer clients.
For a rolling upgrade:
- Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the current message format version currently in use. If you have
not overridden the message format previously, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1 or 0.10.2).
- log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact
following the upgrade for the details on what this configuration does.)
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
- Once the entire cluster is upgraded, bump the protocol version by editing
inter.broker.protocol.version
and setting it to 0.11.0, but
do not change log.message.format.version
yet.
- Restart the brokers one by one for the new protocol version to take effect.
- Once all (or most) consumers have been upgraded to 0.11.0 or later, then change log.message.format.version to 0.11.0 on each
broker and restart them one by one. Note that the older Scala consumer does not support the new message format, so to avoid
the performance cost of down-conversion (or to take advantage of exactly once semantics),
the new Java consumer must be used.
Additional Upgrade Notes:
- If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start
with the new protocol by default.
- Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after.
Similarly for the message format version.
- It is also possible to enable the 0.11.0 message format on individual topics using the topic admin tool (
bin/kafka-topics.sh
)
prior to updating the global setting log.message.format.version
.
- If you are upgrading from a version prior to 0.10.0, it is NOT necessary to first update the message format to 0.10.0
before you switch to 0.11.0.
- Upgrading your Streams application from 0.10.2 to 0.11.0 does not require a broker upgrade.
A Kafka Streams 0.11.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
- If you specify customized
key.serde
, value.serde
and timestamp.extractor
in configs, it is recommended to use their replaced configure parameter as these configs are deprecated.
- See Streams API changes in 0.11.0 for more details.
- Upgrading your Streams application from 0.10.1 to 0.11.0 does not require a broker upgrade.
A Kafka Streams 0.11.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
- You need to recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application.
- If you specify customized
key.serde
, value.serde
and timestamp.extractor
in configs, it is recommended to use their replaced configure parameter as these configs are deprecated.
- If you use a custom (i.e., user implemented) timestamp extractor, you will need to update this code, because the
TimestampExtractor
interface was changed.
- If you register custom metrics, you will need to update this code, because the
StreamsMetric
interface was changed.
- See Streams API changes in 0.11.0 and
Streams API changes in 0.10.2 for more details.
- Upgrading your Streams application from 0.10.0 to 0.11.0 does require a broker upgrade because a Kafka Streams 0.11.0 application can only connect to 0.11.0, 0.10.2, or 0.10.1 brokers.
- There are couple of API changes, that are not backward compatible (cf. Streams API changes in 0.11.0,
Streams API changes in 0.10.2, and
Streams API changes in 0.10.1 for more details).
Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application.
- Upgrading from 0.10.0.x to 0.11.0.3 requires two rolling bounces with config
upgrade.from="0.10.0"
set for first upgrade phase
(cf. KIP-268).
As an alternative, an offline upgrade is also possible.
- prepare your application instances for a rolling bounce and make sure that config
upgrade.from
is set to "0.10.0"
for new version 0.11.0.3
- bounce each instance of your application once
- prepare your newly deployed 0.11.0.3 application instances for a second round of rolling bounces; make sure to remove the value for config
upgrade.from
- bounce each instance of your application once more to complete the upgrade
- Upgrading from 0.10.0.x to 0.11.0.0, 0.11.0.1, or 0.11.0.2 requires an offline upgrade (rolling bounce upgrade is not supported)
- stop all old (0.10.0.x) application instances
- update your code and swap old code and jar file with new code and new jar file
- restart all new (0.11.0.0 , 0.11.0.1, or 0.11.0.2) application instances
- New Kafka Streams configuration parameter
upgrade.from
added that allows rolling bounce upgrade from version 0.10.0.x
- See the Kafka Streams upgrade guide for details about this new config.
- Unclean leader election is now disabled by default. The new default favors durability over availability. Users who wish to
to retain the previous behavior should set the broker config
unclean.leader.election.enable
to true
.
- Producer configs
block.on.buffer.full
, metadata.fetch.timeout.ms
and timeout.ms
have been
removed. They were initially deprecated in Kafka 0.9.0.0.
- The
offsets.topic.replication.factor
broker config is now enforced upon auto topic creation. Internal
auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this
replication factor requirement.
- When compressing data with snappy, the producer and broker will use the compression scheme's default block size (2 x 32 KB)
instead of 1 KB in order to improve the compression ratio. There have been reports of data compressed with the smaller
block size being 50% larger than when compressed with the larger block size. For the snappy case, a producer with 5000
partitions will require an additional 315 MB of JVM heap.
- Similarly, when compressing data with gzip, the producer and broker will use 8 KB instead of 1 KB as the buffer size. The default
for gzip is excessively low (512 bytes).
- The broker configuration
max.message.bytes
now applies to the total size of a batch of messages.
Previously the setting applied to batches of compressed messages, or to non-compressed messages individually.
A message batch may consist of only a single message, so in most cases, the limitation on the size of
individual messages is only reduced by the overhead of the batch format. However, there are some subtle implications
for message format conversion (see below for more detail). Note also
that while previously the broker would ensure that at least one message is returned in each fetch request (regardless of the
total and partition-level fetch sizes), the same behavior now applies to one message batch.
- GC log rotation is enabled by default, see KAFKA-3754 for details.
- Deprecated constructors of RecordMetadata, MetricName and Cluster classes have been removed.
- Added user headers support through a new Headers interface providing user headers read and write access.
- ProducerRecord and ConsumerRecord expose the new Headers API via
Headers headers()
method call.
- ExtendedSerializer and ExtendedDeserializer interfaces are introduced to support serialization and deserialization for headers. Headers will be ignored if the configured serializer and deserializer are not the above classes.
- A new config,
group.initial.rebalance.delay.ms
, was introduced.
This config specifies the time, in milliseconds, that the GroupCoordinator
will delay the initial consumer rebalance.
The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms
as new members join the group, up to a maximum of max.poll.interval.ms
.
The default value for this is 3 seconds.
During development and testing it might be desirable to set this to 0 in order to not delay test execution time.
org.apache.kafka.common.Cluster#partitionsForTopic
, partitionsForNode
and availablePartitionsForTopic
methods
will return an empty list instead of null
(which is considered a bad practice) in case the metadata for the required topic does not exist.
- Streams API configuration parameters
timestamp.extractor
, key.serde
, and value.serde
were deprecated and
replaced by default.timestamp.extractor
, default.key.serde
, and default.value.serde
, respectively.
- For offset commit failures in the Java consumer's
commitAsync
APIs, we no longer expose the underlying
cause when instances of RetriableCommitFailedException
are passed to the commit callback. See
KAFKA-5052 for more detail.
- KIP-107: FetchRequest v5 introduces a partition-level
log_start_offset
field.
- KIP-107: FetchResponse v5 introduces a partition-level
log_start_offset
field.
- KIP-82: ProduceRequest v3 introduces an array of
header
in the message protocol, containing key
field and value
field.
- KIP-82: FetchResponse v5 introduces an array of
header
in the message protocol, containing key
field and value
field.
Kafka 0.11.0 includes support for idempotent and transactional capabilities in the producer. Idempotent delivery
ensures that messages are delivered exactly once to a particular topic partition during the lifetime of a single producer.
Transactional delivery allows producers to send data to multiple partitions such that either all messages are successfully
delivered, or none of them are. Together, these capabilities enable "exactly once semantics" in Kafka. More details on these
features are available in the user guide, but below we add a few specific notes on enabling them in an upgraded cluster.
Note that enabling EoS is not required and there is no impact on the broker's behavior if unused.
- Only the new Java producer and consumer support exactly once semantics.
- These features depend crucially on the 0.11.0 message format. Attempting to use them
on an older format will result in unsupported version errors.
- Transaction state is stored in a new internal topic
__transaction_state
. This topic is not created until the
the first attempt to use a transactional request API. Similar to the consumer offsets topic, there are several settings
to control the topic's configuration. For example, transaction.state.log.min.isr
controls the minimum ISR for
this topic. See the configuration section in the user guide for a full list of options.
- For secure clusters, the transactional APIs require new ACLs which can be turned on with the
bin/kafka-acls.sh
.
tool.
- EoS in Kafka introduces new request APIs and modifies several existing ones. See
KIP-98
for the full details
The 0.11.0 message format includes several major enhancements in order to support better delivery semantics for the producer
(see KIP-98)
and improved replication fault tolerance
(see KIP-101).
Although the new format contains more information to make these improvements possible, we have made the batch format much
more efficient. As long as the number of messages per batch is more than 2, you can expect lower overall overhead. For smaller
batches, however, there may be a small performance impact. See here for the results of our
initial performance analysis of the new message format. You can also find more detail on the message format in the
KIP-98 proposal.
One of the notable differences in the new message format is that even uncompressed messages are stored together as a single batch.
This has a few implications for the broker configuration max.message.bytes
, which limits the size of a single batch. First,
if an older client produces messages to a topic partition using the old format, and the messages are individually smaller than
max.message.bytes
, the broker may still reject them after they are merged into a single batch during the up-conversion process.
Generally this can happen when the aggregate size of the individual messages is larger than max.message.bytes
. There is a similar
effect for older consumers reading messages down-converted from the new format: if the fetch size is not set at least as large as
max.message.bytes
, the consumer may not be able to make progress even if the individual uncompressed messages are smaller
than the configured fetch size. This behavior does not impact the Java client for 0.10.1.0 and later since it uses an updated fetch protocol
which ensures that at least one message can be returned even if it exceeds the fetch size. To get around these problems, you should ensure
1) that the producer's batch size is not set larger than max.message.bytes
, and 2) that the consumer's fetch size is set at
least as large as max.message.bytes
.
Most of the discussion on the performance impact of upgrading to the 0.10.0 message format
remains pertinent to the 0.11.0 upgrade. This mainly affects clusters that are not secured with TLS since "zero-copy" transfer
is already not possible in that case. In order to avoid the cost of down-conversion, you should ensure that consumer applications
are upgraded to the latest 0.11.0 client. Significantly, since the old consumer has been deprecated in 0.11.0.0, it does not support
the new message format. You must upgrade to use the new consumer to use the new message format without the cost of down-conversion.
Note that 0.11.0 consumers support backwards compatibility with 0.10.0 brokers and upward, so it is possible to upgrade the
clients first before the brokers.
0.10.2.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade.
However, please review the notable changes in 0.10.2.0 before upgrading.
Starting with version 0.10.2, Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.10.2
clients can talk to version 0.10.0 or newer brokers. However, if your brokers are older than 0.10.0, you must upgrade all the brokers in the
Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 0.8.x and newer clients.
For a rolling upgrade:
- Update server.properties file on all brokers and add the following properties:
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
- Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.2.
- If your previous message format is 0.10.0, change log.message.format.version to 0.10.2 (this is a no-op as the message format is the same for 0.10.0, 0.10.1 and 0.10.2).
If your previous message format version is lower than 0.10.0, do not change log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 or later.
- Restart the brokers one by one for the new protocol version to take effect.
- If log.message.format.version is still lower than 0.10.0 at this point, wait until all consumers have been upgraded to 0.10.0 or later,
then change log.message.format.version to 0.10.2 on each broker and restart them one by one.
Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
- Upgrading your Streams application from 0.10.1 to 0.10.2 does not require a broker upgrade.
A Kafka Streams 0.10.2 application can connect to 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
- You need to recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application.
- If you use a custom (i.e., user implemented) timestamp extractor, you will need to update this code, because the
TimestampExtractor
interface was changed.
- If you register custom metrics, you will need to update this code, because the
StreamsMetric
interface was changed.
- See Streams API changes in 0.10.2 for more details.
- Upgrading your Streams application from 0.10.0 to 0.10.2 does require a broker upgrade because a Kafka Streams 0.10.2 application can only connect to 0.10.2 or 0.10.1 brokers.
- There are couple of API changes, that are not backward compatible (cf. Streams API changes in 0.10.2 for more details).
Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application.
- Upgrading from 0.10.0.x to 0.10.2.2 requires two rolling bounces with config
upgrade.from="0.10.0"
set for first upgrade phase
(cf. KIP-268).
As an alternative, an offline upgrade is also possible.
- prepare your application instances for a rolling bounce and make sure that config
upgrade.from
is set to "0.10.0"
for new version 0.10.2.2
- bounce each instance of your application once
- prepare your newly deployed 0.10.2.2 application instances for a second round of rolling bounces; make sure to remove the value for config
upgrade.from
- bounce each instance of your application once more to complete the upgrade
- Upgrading from 0.10.0.x to 0.10.2.0 or 0.10.2.1 requires an offline upgrade (rolling bounce upgrade is not supported)
- stop all old (0.10.0.x) application instances
- update your code and swap old code and jar file with new code and new jar file
- restart all new (0.10.2.0 or 0.10.2.1) application instances
- New configuration parameter
upgrade.from
added that allows rolling bounce upgrade from version 0.10.0.x
- The default values for two configurations of the StreamsConfig class were changed to improve the resiliency of Kafka Streams applications. The internal Kafka Streams producer
retries
default value was changed from 0 to 10. The internal Kafka Streams consumer max.poll.interval.ms
default value was changed from 300000 to Integer.MAX_VALUE
.
- The Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.10.2 clients
can talk to version 0.10.0 or newer brokers. Note that some features are not available or are limited when older brokers
are used.
- Several methods on the Java consumer may now throw
InterruptException
if the calling thread is interrupted.
Please refer to the KafkaConsumer
Javadoc for a more in-depth explanation of this change.
- Java consumer now shuts down gracefully. By default, the consumer waits up to 30 seconds to complete pending requests.
A new close API with timeout has been added to
KafkaConsumer
to control the maximum wait time.
- Multiple regular expressions separated by commas can be passed to MirrorMaker with the new Java consumer via the --whitelist option. This
makes the behaviour consistent with MirrorMaker when used the old Scala consumer.
- Upgrading your Streams application from 0.10.1 to 0.10.2 does not require a broker upgrade.
A Kafka Streams 0.10.2 application can connect to 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
- The Zookeeper dependency was removed from the Streams API. The Streams API now uses the Kafka protocol to manage internal topics instead of
modifying Zookeeper directly. This eliminates the need for privileges to access Zookeeper directly and "StreamsConfig.ZOOKEEPER_CONFIG"
should not be set in the Streams app any more. If the Kafka cluster is secured, Streams apps must have the required security privileges to create new topics.
- Several new fields including "security.protocol", "connections.max.idle.ms", "retry.backoff.ms", "reconnect.backoff.ms" and "request.timeout.ms" were added to
StreamsConfig class. User should pay attention to the default values and set these if needed. For more details please refer to 3.5 Kafka Streams Configs.
- KIP-88: OffsetFetchRequest v2 supports retrieval of offsets for all topics if the
topics
array is set to null
.
- KIP-88: OffsetFetchResponse v2 introduces a top-level
error_code
field.
- KIP-103: UpdateMetadataRequest v3 introduces a
listener_name
field to the elements of the end_points
array.
- KIP-108: CreateTopicsRequest v1 introduces a
validate_only
field.
- KIP-108: CreateTopicsResponse v1 introduces an
error_message
field to the elements of the topic_errors
array.
0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade.
However, please notice the Potential breaking changes in 0.10.1.0 before upgrade.
Note: Because new protocols are introduced, it is important to upgrade your Kafka clusters before upgrading your clients (i.e. 0.10.1.x clients
only support 0.10.1.x or later brokers while 0.10.1.x brokers also support older clients).
For a rolling upgrade:
- Update server.properties file on all brokers and add the following properties:
- Upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
- Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.1.0.
- If your previous message format is 0.10.0, change log.message.format.version to 0.10.1 (this is a no-op as the message format is the same for both 0.10.0 and 0.10.1).
If your previous message format version is lower than 0.10.0, do not change log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 or later.
- Restart the brokers one by one for the new protocol version to take effect.
- If log.message.format.version is still lower than 0.10.0 at this point, wait until all consumers have been upgraded to 0.10.0 or later,
then change log.message.format.version to 0.10.1 on each broker and restart them one by one.
Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -