flink-python/pyflink/datastream/connectors/kafka.py (276 lines of code) (raw):
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, Union, Set, Callable, Any, Optional
from py4j.java_gateway import JavaObject, get_java_class
from pyflink.common import DeserializationSchema, SerializationSchema, \
Types, Row
from pyflink.datastream.connectors import Source, Sink
from pyflink.datastream.connectors.base import DeliveryGuarantee, SupportsPreprocessing, \
StreamTransformer
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import to_jarray, get_field, get_field_value
__all__ = [
'KafkaSource',
'KafkaSourceBuilder',
'KafkaSink',
'KafkaSinkBuilder',
'KafkaTopicPartition',
'KafkaOffsetsInitializer',
'KafkaOffsetResetStrategy',
'KafkaRecordSerializationSchema',
'KafkaRecordSerializationSchemaBuilder',
'KafkaTopicSelector'
]
# ---- KafkaSource ----
class KafkaSource(Source):
"""
The Source implementation of Kafka. Please use a :class:`KafkaSourceBuilder` to construct a
:class:`KafkaSource`. The following example shows how to create a KafkaSource emitting records
of String type.
::
>>> source = KafkaSource \\
... .builder() \\
... .set_bootstrap_servers('MY_BOOTSTRAP_SERVERS') \\
... .set_group_id('MY_GROUP') \\
... .set_topics('TOPIC1', 'TOPIC2') \\
... .set_value_only_deserializer(SimpleStringSchema()) \\
... .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \\
... .build()
.. versionadded:: 1.16.0
"""
def __init__(self, j_kafka_source: JavaObject):
super().__init__(j_kafka_source)
@staticmethod
def builder() -> 'KafkaSourceBuilder':
"""
Get a kafkaSourceBuilder to build a :class:`KafkaSource`.
:return: a Kafka source builder.
"""
return KafkaSourceBuilder()
class KafkaSourceBuilder(object):
"""
The builder class for :class:`KafkaSource` to make it easier for the users to construct a
:class:`KafkaSource`.
The following example shows the minimum setup to create a KafkaSource that reads the String
values from a Kafka topic.
::
>>> source = KafkaSource.builder() \\
... .set_bootstrap_servers('MY_BOOTSTRAP_SERVERS') \\
... .set_topics('TOPIC1', 'TOPIC2') \\
... .set_value_only_deserializer(SimpleStringSchema()) \\
... .build()
The bootstrap servers, topics/partitions to consume, and the record deserializer are required
fields that must be set.
To specify the starting offsets of the KafkaSource, one can call :meth:`set_starting_offsets`.
By default, the KafkaSource runs in an CONTINUOUS_UNBOUNDED mode and never stops until the Flink
job is canceled or fails. To let the KafkaSource run in CONTINUOUS_UNBOUNDED but stops at some
given offsets, one can call :meth:`set_stopping_offsets`. For example the following KafkaSource
stops after it consumes up to the latest partition offsets at the point when the Flink started.
::
>>> source = KafkaSource.builder() \\
... .set_bootstrap_servers('MY_BOOTSTRAP_SERVERS') \\
... .set_topics('TOPIC1', 'TOPIC2') \\
... .set_value_only_deserializer(SimpleStringSchema()) \\
... .set_unbounded(KafkaOffsetsInitializer.latest()) \\
... .build()
.. versionadded:: 1.16.0
"""
def __init__(self):
self._j_builder = get_gateway().jvm.org.apache.flink.connector.kafka.source \
.KafkaSource.builder()
def build(self) -> 'KafkaSource':
return KafkaSource(self._j_builder.build())
def set_bootstrap_servers(self, bootstrap_servers: str) -> 'KafkaSourceBuilder':
"""
Sets the bootstrap servers for the KafkaConsumer of the KafkaSource.
:param bootstrap_servers: the bootstrap servers of the Kafka cluster.
:return: this KafkaSourceBuilder.
"""
self._j_builder.setBootstrapServers(bootstrap_servers)
return self
def set_group_id(self, group_id: str) -> 'KafkaSourceBuilder':
"""
Sets the consumer group id of the KafkaSource.
:param group_id: the group id of the KafkaSource.
:return: this KafkaSourceBuilder.
"""
self._j_builder.setGroupId(group_id)
return self
def set_topics(self, *topics: str) -> 'KafkaSourceBuilder':
"""
Set a list of topics the KafkaSource should consume from. All the topics in the list should
have existed in the Kafka cluster. Otherwise, an exception will be thrown. To allow some
topics to be created lazily, please use :meth:`set_topic_pattern` instead.
:param topics: the list of topics to consume from.
:return: this KafkaSourceBuilder.
"""
self._j_builder.setTopics(to_jarray(get_gateway().jvm.java.lang.String, topics))
return self
def set_topic_pattern(self, topic_pattern: str) -> 'KafkaSourceBuilder':
"""
Set a topic pattern to consume from use the java Pattern. For grammar, check out
`JavaDoc <https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html>`_ .
:param topic_pattern: the pattern of the topic name to consume from.
:return: this KafkaSourceBuilder.
"""
self._j_builder.setTopicPattern(get_gateway().jvm.java.util.regex
.Pattern.compile(topic_pattern))
return self
def set_partitions(self, partitions: Set['KafkaTopicPartition']) -> 'KafkaSourceBuilder':
"""
Set a set of partitions to consume from.
Example:
::
>>> KafkaSource.builder().set_partitions({
... KafkaTopicPartition('TOPIC1', 0),
... KafkaTopicPartition('TOPIC1', 1),
... })
:param partitions: the set of partitions to consume from.
:return: this KafkaSourceBuilder.
"""
j_set = get_gateway().jvm.java.util.HashSet()
for tp in partitions:
j_set.add(tp._to_j_topic_partition())
self._j_builder.setPartitions(j_set)
return self
def set_starting_offsets(self, starting_offsets_initializer: 'KafkaOffsetsInitializer') \
-> 'KafkaSourceBuilder':
"""
Specify from which offsets the KafkaSource should start consume from by providing an
:class:`KafkaOffsetsInitializer`.
The following :class:`KafkaOffsetsInitializer` s are commonly used and provided out of the
box. Currently, customized offset initializer is not supported in PyFlink.
* :meth:`KafkaOffsetsInitializer.earliest` - starting from the earliest offsets. This is
also the default offset initializer of the KafkaSource for starting offsets.
* :meth:`KafkaOffsetsInitializer.latest` - starting from the latest offsets.
* :meth:`KafkaOffsetsInitializer.committedOffsets` - starting from the committed offsets of
the consumer group. If there is no committed offsets, starting from the offsets
specified by the :class:`KafkaOffsetResetStrategy`.
* :meth:`KafkaOffsetsInitializer.offsets` - starting from the specified offsets for each
partition.
* :meth:`KafkaOffsetsInitializer.timestamp` - starting from the specified timestamp for each
partition. Note that the guarantee here is that all the records in Kafka whose timestamp
is greater than the given starting timestamp will be consumed. However, it is possible
that some consumer records whose timestamp is smaller than the given starting timestamp
are also consumed.
:param starting_offsets_initializer: the :class:`KafkaOffsetsInitializer` setting the
starting offsets for the Source.
:return: this KafkaSourceBuilder.
"""
self._j_builder.setStartingOffsets(starting_offsets_initializer._j_initializer)
return self
def set_unbounded(self, stopping_offsets_initializer: 'KafkaOffsetsInitializer') \
-> 'KafkaSourceBuilder':
"""
By default, the KafkaSource is set to run in CONTINUOUS_UNBOUNDED manner and thus never
stops until the Flink job fails or is canceled. To let the KafkaSource run as a streaming
source but still stops at some point, one can set an :class:`KafkaOffsetsInitializer`
to specify the stopping offsets for each partition. When all the partitions have reached
their stopping offsets, the KafkaSource will then exit.
This method is different from :meth:`set_bounded` that after setting the stopping offsets
with this method, KafkaSource will still be CONTINUOUS_UNBOUNDED even though it will stop at
the stopping offsets specified by the stopping offset initializer.
The following :class:`KafkaOffsetsInitializer` s are commonly used and provided out of the
box. Currently, customized offset initializer is not supported in PyFlink.
* :meth:`KafkaOffsetsInitializer.latest` - starting from the latest offsets.
* :meth:`KafkaOffsetsInitializer.committedOffsets` - starting from the committed offsets of
the consumer group. If there is no committed offsets, starting from the offsets
specified by the :class:`KafkaOffsetResetStrategy`.
* :meth:`KafkaOffsetsInitializer.offsets` - starting from the specified offsets for each
partition.
* :meth:`KafkaOffsetsInitializer.timestamp` - starting from the specified timestamp for each
partition. Note that the guarantee here is that all the records in Kafka whose timestamp
is greater than the given starting timestamp will be consumed. However, it is possible
that some consumer records whose timestamp is smaller than the given starting timestamp
are also consumed.
:param stopping_offsets_initializer: the :class:`KafkaOffsetsInitializer` to specify the
stopping offsets.
:return: this KafkaSourceBuilder
"""
self._j_builder.setUnbounded(stopping_offsets_initializer._j_initializer)
return self
def set_bounded(self, stopping_offsets_initializer: 'KafkaOffsetsInitializer') \
-> 'KafkaSourceBuilder':
"""
By default, the KafkaSource is set to run in CONTINUOUS_UNBOUNDED manner and thus never
stops until the Flink job fails or is canceled. To let the KafkaSource run in BOUNDED manner
and stop at some point, one can set an :class:`KafkaOffsetsInitializer` to specify the
stopping offsets for each partition. When all the partitions have reached their stopping
offsets, the KafkaSource will then exit.
This method is different from :meth:`set_unbounded` that after setting the stopping offsets
with this method, :meth:`KafkaSource.get_boundedness` will return BOUNDED instead of
CONTINUOUS_UNBOUNDED.
The following :class:`KafkaOffsetsInitializer` s are commonly used and provided out of the
box. Currently, customized offset initializer is not supported in PyFlink.
* :meth:`KafkaOffsetsInitializer.latest` - starting from the latest offsets.
* :meth:`KafkaOffsetsInitializer.committedOffsets` - starting from the committed offsets of
the consumer group. If there is no committed offsets, starting from the offsets
specified by the :class:`KafkaOffsetResetStrategy`.
* :meth:`KafkaOffsetsInitializer.offsets` - starting from the specified offsets for each
partition.
* :meth:`KafkaOffsetsInitializer.timestamp` - starting from the specified timestamp for each
partition. Note that the guarantee here is that all the records in Kafka whose timestamp
is greater than the given starting timestamp will be consumed. However, it is possible
that some consumer records whose timestamp is smaller than the given starting timestamp
are also consumed.
:param stopping_offsets_initializer: the :class:`KafkaOffsetsInitializer` to specify the
stopping offsets.
:return: this KafkaSourceBuilder
"""
self._j_builder.setBounded(stopping_offsets_initializer._j_initializer)
return self
def set_value_only_deserializer(self, deserialization_schema: DeserializationSchema) \
-> 'KafkaSourceBuilder':
"""
Sets the :class:`~pyflink.common.serialization.DeserializationSchema` for deserializing the
value of Kafka's ConsumerRecord. The other information (e.g. key) in a ConsumerRecord will
be ignored.
:param deserialization_schema: the :class:`DeserializationSchema` to use for
deserialization.
:return: this KafkaSourceBuilder.
"""
self._j_builder.setValueOnlyDeserializer(deserialization_schema._j_deserialization_schema)
return self
def set_client_id_prefix(self, prefix: str) -> 'KafkaSourceBuilder':
"""
Sets the client id prefix of this KafkaSource.
:param prefix: the client id prefix to use for this KafkaSource.
:return: this KafkaSourceBuilder.
"""
self._j_builder.setClientIdPrefix(prefix)
return self
def set_property(self, key: str, value: str) -> 'KafkaSourceBuilder':
"""
Set an arbitrary property for the KafkaSource and KafkaConsumer. The valid keys can be found
in ConsumerConfig and KafkaSourceOptions.
Note that the following keys will be overridden by the builder when the KafkaSource is
created.
* ``auto.offset.reset.strategy`` is overridden by AutoOffsetResetStrategy returned by
:class:`KafkaOffsetsInitializer` for the starting offsets, which is by default
:meth:`KafkaOffsetsInitializer.earliest`.
* ``partition.discovery.interval.ms`` is overridden to -1 when :meth:`set_bounded` has been
invoked.
:param key: the key of the property.
:param value: the value of the property.
:return: this KafkaSourceBuilder.
"""
self._j_builder.setProperty(key, value)
return self
def set_properties(self, props: Dict) -> 'KafkaSourceBuilder':
"""
Set arbitrary properties for the KafkaSource and KafkaConsumer. The valid keys can be found
in ConsumerConfig and KafkaSourceOptions.
Note that the following keys will be overridden by the builder when the KafkaSource is
created.
* ``auto.offset.reset.strategy`` is overridden by AutoOffsetResetStrategy returned by
:class:`KafkaOffsetsInitializer` for the starting offsets, which is by default
:meth:`KafkaOffsetsInitializer.earliest`.
* ``partition.discovery.interval.ms`` is overridden to -1 when :meth:`set_bounded` has been
invoked.
* ``client.id`` is overridden to "client.id.prefix-RANDOM_LONG", or "group.id-RANDOM_LONG"
if the client id prefix is not set.
:param props: the properties to set for the KafkaSource.
:return: this KafkaSourceBuilder.
"""
gateway = get_gateway()
j_properties = gateway.jvm.java.util.Properties()
for key, value in props.items():
j_properties.setProperty(key, value)
self._j_builder.setProperties(j_properties)
return self
class KafkaTopicPartition(object):
"""
Corresponding to Java ``org.apache.kafka.common.TopicPartition`` class.
Example:
::
>>> topic_partition = KafkaTopicPartition('TOPIC1', 0)
.. versionadded:: 1.16.0
"""
def __init__(self, topic: str, partition: int):
self._topic = topic
self._partition = partition
def _to_j_topic_partition(self):
jvm = get_gateway().jvm
return jvm.org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition(
self._topic, self._partition)
def __eq__(self, other):
if not isinstance(other, KafkaTopicPartition):
return False
return self._topic == other._topic and self._partition == other._partition
def __hash__(self):
return 31 * (31 + self._partition) + hash(self._topic)
class KafkaOffsetResetStrategy(Enum):
"""
Corresponding to Java ``org.apache.kafka.client.consumer.OffsetResetStrategy`` class.
.. versionadded:: 1.16.0
"""
LATEST = 0
EARLIEST = 1
NONE = 2
def _to_j_offset_reset_strategy(self):
JOffsetResetStrategy = get_gateway().jvm.org.apache.flink.kafka.shaded.org.apache.kafka.\
clients.consumer.OffsetResetStrategy
return getattr(JOffsetResetStrategy, self.name)
class KafkaOffsetsInitializer(object):
"""
An interface for users to specify the starting / stopping offset of a KafkaPartitionSplit.
.. versionadded:: 1.16.0
"""
def __init__(self, j_initializer: JavaObject):
self._j_initializer = j_initializer
@staticmethod
def committed_offsets(
offset_reset_strategy: 'KafkaOffsetResetStrategy' = KafkaOffsetResetStrategy.NONE) -> \
'KafkaOffsetsInitializer':
"""
Get an :class:`KafkaOffsetsInitializer` which initializes the offsets to the committed
offsets. An exception will be thrown at runtime if there is no committed offsets.
An optional :class:`KafkaOffsetResetStrategy` can be specified to initialize the offsets if
the committed offsets does not exist.
:param offset_reset_strategy: the offset reset strategy to use when the committed offsets do
not exist.
:return: an offset initializer which initialize the offsets to the committed offsets.
"""
JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source.\
enumerator.initializer.OffsetsInitializer
return KafkaOffsetsInitializer(JOffsetsInitializer.committedOffsets(
offset_reset_strategy._to_j_offset_reset_strategy()))
@staticmethod
def timestamp(timestamp: int) -> 'KafkaOffsetsInitializer':
"""
Get an :class:`KafkaOffsetsInitializer` which initializes the offsets in each partition so
that the initialized offset is the offset of the first record whose record timestamp is
greater than or equals the give timestamp.
:param timestamp: the timestamp to start the consumption.
:return: an :class:`OffsetsInitializer` which initializes the offsets based on the given
timestamp.
"""
JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source. \
enumerator.initializer.OffsetsInitializer
return KafkaOffsetsInitializer(JOffsetsInitializer.timestamp(timestamp))
@staticmethod
def earliest() -> 'KafkaOffsetsInitializer':
"""
Get an :class:`KafkaOffsetsInitializer` which initializes the offsets to the earliest
available offsets of each partition.
:return: an :class:`KafkaOffsetsInitializer` which initializes the offsets to the earliest
available offsets.
"""
JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source. \
enumerator.initializer.OffsetsInitializer
return KafkaOffsetsInitializer(JOffsetsInitializer.earliest())
@staticmethod
def latest() -> 'KafkaOffsetsInitializer':
"""
Get an :class:`KafkaOffsetsInitializer` which initializes the offsets to the latest offsets
of each partition.
:return: an :class:`KafkaOffsetsInitializer` which initializes the offsets to the latest
offsets.
"""
JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source. \
enumerator.initializer.OffsetsInitializer
return KafkaOffsetsInitializer(JOffsetsInitializer.latest())
@staticmethod
def offsets(offsets: Dict['KafkaTopicPartition', int],
offset_reset_strategy: 'KafkaOffsetResetStrategy' =
KafkaOffsetResetStrategy.EARLIEST) -> 'KafkaOffsetsInitializer':
"""
Get an :class:`KafkaOffsetsInitializer` which initializes the offsets to the specified
offsets.
An optional :class:`KafkaOffsetResetStrategy` can be specified to initialize the offsets in
case the specified offset is out of range.
Example:
::
>>> KafkaOffsetsInitializer.offsets({
... KafkaTopicPartition('TOPIC1', 0): 0,
... KafkaTopicPartition('TOPIC1', 1): 10000
... }, KafkaOffsetResetStrategy.EARLIEST)
:param offsets: the specified offsets for each partition.
:param offset_reset_strategy: the :class:`KafkaOffsetResetStrategy` to use when the
specified offset is out of range.
:return: an :class:`KafkaOffsetsInitializer` which initializes the offsets to the specified
offsets.
"""
jvm = get_gateway().jvm
j_map_wrapper = jvm.org.apache.flink.python.util.HashMapWrapper(
None, get_java_class(jvm.Long))
for tp, offset in offsets.items():
j_map_wrapper.put(tp._to_j_topic_partition(), offset)
JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source. \
enumerator.initializer.OffsetsInitializer
return KafkaOffsetsInitializer(JOffsetsInitializer.offsets(
j_map_wrapper.asMap(), offset_reset_strategy._to_j_offset_reset_strategy()))
class KafkaSink(Sink, SupportsPreprocessing):
"""
Flink Sink to produce data into a Kafka topic. The sink supports all delivery guarantees
described by :class:`DeliveryGuarantee`.
* :attr:`DeliveryGuarantee.NONE` does not provide any guarantees: messages may be lost in case
of issues on the Kafka broker and messages may be duplicated in case of a Flink failure.
* :attr:`DeliveryGuarantee.AT_LEAST_ONCE` the sink will wait for all outstanding records in the
Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. No messages will be
lost in case of any issue with the Kafka brokers but messages may be duplicated when Flink
restarts.
* :attr:`DeliveryGuarantee.EXACTLY_ONCE`: In this mode the KafkaSink will write all messages in
a Kafka transaction that will be committed to Kafka on a checkpoint. Thus, if the consumer
reads only committed data (see Kafka consumer config ``isolation.level``), no duplicates
will be seen in case of a Flink restart. However, this delays record writing effectively
until a checkpoint is written, so adjust the checkpoint duration accordingly. Please ensure
that you use unique transactional id prefixes across your applications running on the same
Kafka cluster such that multiple running jobs do not interfere in their transactions!
Additionally, it is highly recommended to tweak Kafka transaction timeout (link) >> maximum
checkpoint duration + maximum restart duration or data loss may happen when Kafka expires an
uncommitted transaction.
.. versionadded:: 1.16.0
"""
def __init__(self, j_kafka_sink, transformer: Optional[StreamTransformer] = None):
super().__init__(j_kafka_sink)
self._transformer = transformer
@staticmethod
def builder() -> 'KafkaSinkBuilder':
"""
Create a :class:`KafkaSinkBuilder` to construct :class:`KafkaSink`.
"""
return KafkaSinkBuilder()
def get_transformer(self) -> Optional[StreamTransformer]:
return self._transformer
class KafkaSinkBuilder(object):
"""
Builder to construct :class:`KafkaSink`.
The following example shows the minimum setup to create a KafkaSink that writes String values
to a Kafka topic.
::
>>> record_serializer = KafkaRecordSerializationSchema.builder() \\
... .set_topic(MY_SINK_TOPIC) \\
... .set_value_serialization_schema(SimpleStringSchema()) \\
... .build()
>>> sink = KafkaSink.builder() \\
... .set_bootstrap_servers(MY_BOOTSTRAP_SERVERS) \\
... .set_record_serializer(record_serializer) \\
... .build()
One can also configure different :class:`DeliveryGuarantee` by using
:meth:`set_delivery_guarantee` but keep in mind when using
:attr:`DeliveryGuarantee.EXACTLY_ONCE`, one must set the transactional id prefix
:meth:`set_transactional_id_prefix`.
.. versionadded:: 1.16.0
"""
def __init__(self):
jvm = get_gateway().jvm
self._j_builder = jvm.org.apache.flink.connector.kafka.sink.KafkaSink.builder()
self._preprocessing = None
def build(self) -> 'KafkaSink':
"""
Constructs the :class:`KafkaSink` with the configured properties.
"""
return KafkaSink(self._j_builder.build(), self._preprocessing)
def set_bootstrap_servers(self, bootstrap_servers: str) -> 'KafkaSinkBuilder':
"""
Sets the Kafka bootstrap servers.
:param bootstrap_servers: A comma separated list of valid URIs to reach the Kafka broker.
"""
self._j_builder.setBootstrapServers(bootstrap_servers)
return self
def set_delivery_guarantee(self, delivery_guarantee: DeliveryGuarantee) -> 'KafkaSinkBuilder':
"""
Sets the wanted :class:`DeliveryGuarantee`. The default delivery guarantee is
:attr:`DeliveryGuarantee.NONE`.
:param delivery_guarantee: The wanted :class:`DeliveryGuarantee`.
"""
self._j_builder.setDeliveryGuarantee(delivery_guarantee._to_j_delivery_guarantee())
return self
def set_transactional_id_prefix(self, transactional_id_prefix: str) -> 'KafkaSinkBuilder':
"""
Sets the prefix for all created transactionalIds if :attr:`DeliveryGuarantee.EXACTLY_ONCE`
is configured.
It is mandatory to always set this value with :attr:`DeliveryGuarantee.EXACTLY_ONCE` to
prevent corrupted transactions if multiple jobs using the KafkaSink run against the same
Kafka Cluster. The default prefix is ``"kafka-sink"``.
The size of the prefix is capped by MAXIMUM_PREFIX_BYTES (6400) formatted with UTF-8.
It is important to keep the prefix stable across application restarts. If the prefix changes
it might happen that lingering transactions are not correctly aborted and newly written
messages are not immediately consumable until transactions timeout.
:param transactional_id_prefix: The transactional id prefix.
"""
self._j_builder.setTransactionalIdPrefix(transactional_id_prefix)
return self
def set_record_serializer(self, record_serializer: 'KafkaRecordSerializationSchema') \
-> 'KafkaSinkBuilder':
"""
Sets the :class:`KafkaRecordSerializationSchema` that transforms incoming records to kafka
producer records.
:param record_serializer: The :class:`KafkaRecordSerializationSchema`.
"""
# NOTE: If topic selector is a generated first-column selector, do extra preprocessing
j_topic_selector = get_field_value(record_serializer._j_serialization_schema,
'topicSelector')
if (
j_topic_selector.getClass().getCanonicalName() ==
'org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder.'
'CachingTopicSelector'
) and (
get_field_value(j_topic_selector, 'topicSelector').getClass().getCanonicalName()
is not None and
(get_field_value(j_topic_selector, 'topicSelector').getClass().getCanonicalName()
.startswith('com.sun.proxy') or
get_field_value(j_topic_selector, 'topicSelector').getClass().getCanonicalName()
.startswith('jdk.proxy'))
):
record_serializer._wrap_serialization_schema()
self._preprocessing = record_serializer._build_preprocessing()
self._j_builder.setRecordSerializer(record_serializer._j_serialization_schema)
return self
def set_property(self, key: str, value: str) -> 'KafkaSinkBuilder':
"""
Sets kafka producer config.
:param key: Kafka producer config key.
:param value: Kafka producer config value.
"""
self._j_builder.setProperty(key, value)
return self
class KafkaRecordSerializationSchema(SerializationSchema):
"""
A serialization schema which defines how to convert the stream record to kafka producer record.
.. versionadded:: 1.16.0
"""
def __init__(self, j_serialization_schema,
topic_selector: Optional['KafkaTopicSelector'] = None):
super().__init__(j_serialization_schema)
self._topic_selector = topic_selector
@staticmethod
def builder() -> 'KafkaRecordSerializationSchemaBuilder':
"""
Creates a default schema builder to provide common building blocks i.e. key serialization,
value serialization, topic selection.
"""
return KafkaRecordSerializationSchemaBuilder()
def _wrap_serialization_schema(self):
jvm = get_gateway().jvm
def _wrap_schema(field_name):
j_schema_field = get_field(self._j_serialization_schema.getClass(), field_name)
if j_schema_field.get(self._j_serialization_schema) is not None:
j_schema_field.set(
self._j_serialization_schema,
jvm.org.apache.flink.python.util.PythonConnectorUtils
.SecondColumnSerializationSchema(
j_schema_field.get(self._j_serialization_schema)
)
)
_wrap_schema('keySerializationSchema')
_wrap_schema('valueSerializationSchema')
def _build_preprocessing(self) -> StreamTransformer:
class SelectTopicTransformer(StreamTransformer):
def __init__(self, topic_selector: KafkaTopicSelector):
self._topic_selector = topic_selector
def apply(self, ds):
output_type = Types.ROW([Types.STRING(), ds.get_type()])
return ds.map(lambda v: Row(self._topic_selector.apply(v), v),
output_type=output_type)
return SelectTopicTransformer(self._topic_selector)
class KafkaRecordSerializationSchemaBuilder(object):
"""
Builder to construct :class:`KafkaRecordSerializationSchema`.
Example:
::
>>> KafkaRecordSerializationSchema.builder() \\
... .set_topic('topic') \\
... .set_key_serialization_schema(SimpleStringSchema()) \\
... .set_value_serialization_schema(SimpleStringSchema()) \\
... .build()
And the sink topic can be calculated dynamically from each record:
::
>>> KafkaRecordSerializationSchema.builder() \\
... .set_topic_selector(lambda row: 'topic-' + row['category']) \\
... .set_value_serialization_schema(
... JsonRowSerializationSchema.builder().with_type_info(ROW_TYPE).build()) \\
... .build()
It is necessary to configure exactly one serialization method for the value and a topic.
.. versionadded:: 1.16.0
"""
def __init__(self):
jvm = get_gateway().jvm
self._j_builder = jvm.org.apache.flink.connector.kafka.sink \
.KafkaRecordSerializationSchemaBuilder()
self._fixed_topic = True # type: bool
self._topic_selector = None # type: Optional[KafkaTopicSelector]
self._key_serialization_schema = None # type: Optional[SerializationSchema]
self._value_serialization_schema = None # type: Optional[SerializationSchema]
def build(self) -> 'KafkaRecordSerializationSchema':
"""
Constructs the :class:`KafkaRecordSerializationSchemaBuilder` with the configured
properties.
"""
if self._fixed_topic:
return KafkaRecordSerializationSchema(self._j_builder.build())
else:
return KafkaRecordSerializationSchema(self._j_builder.build(), self._topic_selector)
def set_topic(self, topic: str) -> 'KafkaRecordSerializationSchemaBuilder':
"""
Sets a fixed topic which used as destination for all records.
:param topic: The fixed topic.
"""
self._j_builder.setTopic(topic)
self._fixed_topic = True
return self
def set_topic_selector(self, topic_selector: Union[Callable[[Any], str], 'KafkaTopicSelector'])\
-> 'KafkaRecordSerializationSchemaBuilder':
"""
Sets a topic selector which computes the target topic for every incoming record.
:param topic_selector: A :class:`KafkaTopicSelector` implementation or a function that
consumes each incoming record and return the topic string.
"""
if not isinstance(topic_selector, KafkaTopicSelector) and not callable(topic_selector):
raise TypeError('topic_selector must be KafkaTopicSelector or a callable')
if not isinstance(topic_selector, KafkaTopicSelector):
class TopicSelectorFunctionAdapter(KafkaTopicSelector):
def __init__(self, f: Callable[[Any], str]):
self._f = f
def apply(self, data) -> str:
return self._f(data)
topic_selector = TopicSelectorFunctionAdapter(topic_selector)
jvm = get_gateway().jvm
self._j_builder.setTopicSelector(
jvm.org.apache.flink.python.util.PythonConnectorUtils.createFirstColumnTopicSelector(
get_java_class(jvm.org.apache.flink.connector.kafka.sink.TopicSelector)
)
)
self._fixed_topic = False
self._topic_selector = topic_selector
return self
def set_key_serialization_schema(self, key_serialization_schema: SerializationSchema) \
-> 'KafkaRecordSerializationSchemaBuilder':
"""
Sets a :class:`SerializationSchema` which is used to serialize the incoming element to the
key of the producer record. The key serialization is optional, if not set, the key of the
producer record will be null.
:param key_serialization_schema: The :class:`SerializationSchema` to serialize each incoming
record as the key of producer record.
"""
self._key_serialization_schema = key_serialization_schema
self._j_builder.setKeySerializationSchema(key_serialization_schema._j_serialization_schema)
return self
def set_value_serialization_schema(self, value_serialization_schema: SerializationSchema) \
-> 'KafkaRecordSerializationSchemaBuilder':
"""
Sets a :class:`SerializationSchema` which is used to serialize the incoming element to the
value of the producer record. The value serialization is required.
:param value_serialization_schema: The :class:`SerializationSchema` to serialize each data
record as the key of producer record.
"""
self._value_serialization_schema = value_serialization_schema
self._j_builder.setValueSerializationSchema(
value_serialization_schema._j_serialization_schema)
return self
class KafkaTopicSelector(ABC):
"""
Select topic for an incoming record
.. versionadded:: 1.16.0
"""
@abstractmethod
def apply(self, data) -> str:
pass