################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, Union, Set, Callable, Any, Optional

from py4j.java_gateway import JavaObject, get_java_class
from pyflink.common import DeserializationSchema, SerializationSchema, \
    Types, Row
from pyflink.datastream.connectors import Source, Sink
from pyflink.datastream.connectors.base import DeliveryGuarantee, SupportsPreprocessing, \
    StreamTransformer
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import to_jarray, get_field, get_field_value

__all__ = [
    'KafkaSource',
    'KafkaSourceBuilder',
    'KafkaSink',
    'KafkaSinkBuilder',
    'KafkaTopicPartition',
    'KafkaOffsetsInitializer',
    'KafkaOffsetResetStrategy',
    'KafkaRecordSerializationSchema',
    'KafkaRecordSerializationSchemaBuilder',
    'KafkaTopicSelector'
]


# ---- KafkaSource ----


class KafkaSource(Source):
    """
    The Source implementation of Kafka. Please use a :class:`KafkaSourceBuilder` to construct a
    :class:`KafkaSource`. The following example shows how to create a KafkaSource emitting records
    of String type.

    ::

        >>> source = KafkaSource \\
        ...     .builder() \\
        ...     .set_bootstrap_servers('MY_BOOTSTRAP_SERVERS') \\
        ...     .set_group_id('MY_GROUP') \\
        ...     .set_topics('TOPIC1', 'TOPIC2') \\
        ...     .set_value_only_deserializer(SimpleStringSchema()) \\
        ...     .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \\
        ...     .build()

    .. versionadded:: 1.16.0
    """

    def __init__(self, j_kafka_source: JavaObject):
        super().__init__(j_kafka_source)

    @staticmethod
    def builder() -> 'KafkaSourceBuilder':
        """
        Get a kafkaSourceBuilder to build a :class:`KafkaSource`.

        :return: a Kafka source builder.
        """
        return KafkaSourceBuilder()


class KafkaSourceBuilder(object):
    """
    The builder class for :class:`KafkaSource` to make it easier for the users to construct a
    :class:`KafkaSource`.

    The following example shows the minimum setup to create a KafkaSource that reads the String
    values from a Kafka topic.

    ::

        >>> source = KafkaSource.builder() \\
        ...     .set_bootstrap_servers('MY_BOOTSTRAP_SERVERS') \\
        ...     .set_topics('TOPIC1', 'TOPIC2') \\
        ...     .set_value_only_deserializer(SimpleStringSchema()) \\
        ...     .build()

    The bootstrap servers, topics/partitions to consume, and the record deserializer are required
    fields that must be set.

    To specify the starting offsets of the KafkaSource, one can call :meth:`set_starting_offsets`.

    By default, the KafkaSource runs in an CONTINUOUS_UNBOUNDED mode and never stops until the Flink
    job is canceled or fails. To let the KafkaSource run in CONTINUOUS_UNBOUNDED but stops at some
    given offsets, one can call :meth:`set_stopping_offsets`. For example the following KafkaSource
    stops after it consumes up to the latest partition offsets at the point when the Flink started.

    ::

        >>> source = KafkaSource.builder() \\
        ...     .set_bootstrap_servers('MY_BOOTSTRAP_SERVERS') \\
        ...     .set_topics('TOPIC1', 'TOPIC2') \\
        ...     .set_value_only_deserializer(SimpleStringSchema()) \\
        ...     .set_unbounded(KafkaOffsetsInitializer.latest()) \\
        ...     .build()

    .. versionadded:: 1.16.0
    """

    def __init__(self):
        self._j_builder = get_gateway().jvm.org.apache.flink.connector.kafka.source \
            .KafkaSource.builder()

    def build(self) -> 'KafkaSource':
        return KafkaSource(self._j_builder.build())

    def set_bootstrap_servers(self, bootstrap_servers: str) -> 'KafkaSourceBuilder':
        """
        Sets the bootstrap servers for the KafkaConsumer of the KafkaSource.

        :param bootstrap_servers: the bootstrap servers of the Kafka cluster.
        :return: this KafkaSourceBuilder.
        """
        self._j_builder.setBootstrapServers(bootstrap_servers)
        return self

    def set_group_id(self, group_id: str) -> 'KafkaSourceBuilder':
        """
        Sets the consumer group id of the KafkaSource.

        :param group_id: the group id of the KafkaSource.
        :return: this KafkaSourceBuilder.
        """
        self._j_builder.setGroupId(group_id)
        return self

    def set_topics(self, *topics: str) -> 'KafkaSourceBuilder':
        """
        Set a list of topics the KafkaSource should consume from. All the topics in the list should
        have existed in the Kafka cluster. Otherwise, an exception will be thrown. To allow some
        topics to be created lazily, please use :meth:`set_topic_pattern` instead.

        :param topics: the list of topics to consume from.
        :return: this KafkaSourceBuilder.
        """
        self._j_builder.setTopics(to_jarray(get_gateway().jvm.java.lang.String, topics))
        return self

    def set_topic_pattern(self, topic_pattern: str) -> 'KafkaSourceBuilder':
        """
        Set a topic pattern to consume from use the java Pattern. For grammar, check out
        `JavaDoc <https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html>`_ .

        :param topic_pattern: the pattern of the topic name to consume from.
        :return: this KafkaSourceBuilder.
        """
        self._j_builder.setTopicPattern(get_gateway().jvm.java.util.regex
                                        .Pattern.compile(topic_pattern))
        return self

    def set_partitions(self, partitions: Set['KafkaTopicPartition']) -> 'KafkaSourceBuilder':
        """
        Set a set of partitions to consume from.

        Example:
        ::

            >>> KafkaSource.builder().set_partitions({
            ...     KafkaTopicPartition('TOPIC1', 0),
            ...     KafkaTopicPartition('TOPIC1', 1),
            ... })

        :param partitions: the set of partitions to consume from.
        :return: this KafkaSourceBuilder.
        """
        j_set = get_gateway().jvm.java.util.HashSet()
        for tp in partitions:
            j_set.add(tp._to_j_topic_partition())
        self._j_builder.setPartitions(j_set)
        return self

    def set_starting_offsets(self, starting_offsets_initializer: 'KafkaOffsetsInitializer') \
            -> 'KafkaSourceBuilder':
        """
        Specify from which offsets the KafkaSource should start consume from by providing an
        :class:`KafkaOffsetsInitializer`.

        The following :class:`KafkaOffsetsInitializer` s are commonly used and provided out of the
        box. Currently, customized offset initializer is not supported in PyFlink.

        * :meth:`KafkaOffsetsInitializer.earliest` - starting from the earliest offsets. This is
          also the default offset initializer of the KafkaSource for starting offsets.
        * :meth:`KafkaOffsetsInitializer.latest` - starting from the latest offsets.
        * :meth:`KafkaOffsetsInitializer.committedOffsets` - starting from the committed offsets of
          the consumer group. If there is no committed offsets, starting from the offsets
          specified by the :class:`KafkaOffsetResetStrategy`.
        * :meth:`KafkaOffsetsInitializer.offsets` - starting from the specified offsets for each
          partition.
        * :meth:`KafkaOffsetsInitializer.timestamp` - starting from the specified timestamp for each
          partition. Note that the guarantee here is that all the records in Kafka whose timestamp
          is greater than the given starting timestamp will be consumed. However, it is possible
          that some consumer records whose timestamp is smaller than the given starting timestamp
          are also consumed.

        :param starting_offsets_initializer: the :class:`KafkaOffsetsInitializer` setting the
            starting offsets for the Source.
        :return: this KafkaSourceBuilder.
        """
        self._j_builder.setStartingOffsets(starting_offsets_initializer._j_initializer)
        return self

    def set_unbounded(self, stopping_offsets_initializer: 'KafkaOffsetsInitializer') \
            -> 'KafkaSourceBuilder':
        """
        By default, the KafkaSource is set to run in CONTINUOUS_UNBOUNDED manner and thus never
        stops until the Flink job fails or is canceled. To let the KafkaSource run as a streaming
        source but still stops at some point, one can set an :class:`KafkaOffsetsInitializer`
        to specify the stopping offsets for each partition. When all the partitions have reached
        their stopping offsets, the KafkaSource will then exit.

        This method is different from :meth:`set_bounded` that after setting the stopping offsets
        with this method, KafkaSource will still be CONTINUOUS_UNBOUNDED even though it will stop at
        the stopping offsets specified by the stopping offset initializer.

        The following :class:`KafkaOffsetsInitializer` s are commonly used and provided out of the
        box. Currently, customized offset initializer is not supported in PyFlink.

        * :meth:`KafkaOffsetsInitializer.latest` - starting from the latest offsets.
        * :meth:`KafkaOffsetsInitializer.committedOffsets` - starting from the committed offsets of
          the consumer group. If there is no committed offsets, starting from the offsets
          specified by the :class:`KafkaOffsetResetStrategy`.
        * :meth:`KafkaOffsetsInitializer.offsets` - starting from the specified offsets for each
          partition.
        * :meth:`KafkaOffsetsInitializer.timestamp` - starting from the specified timestamp for each
          partition. Note that the guarantee here is that all the records in Kafka whose timestamp
          is greater than the given starting timestamp will be consumed. However, it is possible
          that some consumer records whose timestamp is smaller than the given starting timestamp
          are also consumed.

        :param stopping_offsets_initializer: the :class:`KafkaOffsetsInitializer` to specify the
            stopping offsets.
        :return: this KafkaSourceBuilder
        """
        self._j_builder.setUnbounded(stopping_offsets_initializer._j_initializer)
        return self

    def set_bounded(self, stopping_offsets_initializer: 'KafkaOffsetsInitializer') \
            -> 'KafkaSourceBuilder':
        """
        By default, the KafkaSource is set to run in CONTINUOUS_UNBOUNDED manner and thus never
        stops until the Flink job fails or is canceled. To let the KafkaSource run in BOUNDED manner
        and stop at some point, one can set an :class:`KafkaOffsetsInitializer` to specify the
        stopping offsets for each partition. When all the partitions have reached their stopping
        offsets, the KafkaSource will then exit.

        This method is different from :meth:`set_unbounded` that after setting the stopping offsets
        with this method, :meth:`KafkaSource.get_boundedness` will return BOUNDED instead of
        CONTINUOUS_UNBOUNDED.

        The following :class:`KafkaOffsetsInitializer` s are commonly used and provided out of the
        box. Currently, customized offset initializer is not supported in PyFlink.

        * :meth:`KafkaOffsetsInitializer.latest` - starting from the latest offsets.
        * :meth:`KafkaOffsetsInitializer.committedOffsets` - starting from the committed offsets of
          the consumer group. If there is no committed offsets, starting from the offsets
          specified by the :class:`KafkaOffsetResetStrategy`.
        * :meth:`KafkaOffsetsInitializer.offsets` - starting from the specified offsets for each
          partition.
        * :meth:`KafkaOffsetsInitializer.timestamp` - starting from the specified timestamp for each
          partition. Note that the guarantee here is that all the records in Kafka whose timestamp
          is greater than the given starting timestamp will be consumed. However, it is possible
          that some consumer records whose timestamp is smaller than the given starting timestamp
          are also consumed.

        :param stopping_offsets_initializer: the :class:`KafkaOffsetsInitializer` to specify the
            stopping offsets.
        :return: this KafkaSourceBuilder
        """
        self._j_builder.setBounded(stopping_offsets_initializer._j_initializer)
        return self

    def set_value_only_deserializer(self, deserialization_schema: DeserializationSchema) \
            -> 'KafkaSourceBuilder':
        """
        Sets the :class:`~pyflink.common.serialization.DeserializationSchema` for deserializing the
        value of Kafka's ConsumerRecord. The other information (e.g. key) in a ConsumerRecord will
        be ignored.

        :param deserialization_schema: the :class:`DeserializationSchema` to use for
            deserialization.
        :return: this KafkaSourceBuilder.
        """
        self._j_builder.setValueOnlyDeserializer(deserialization_schema._j_deserialization_schema)
        return self

    def set_client_id_prefix(self, prefix: str) -> 'KafkaSourceBuilder':
        """
        Sets the client id prefix of this KafkaSource.

        :param prefix: the client id prefix to use for this KafkaSource.
        :return: this KafkaSourceBuilder.
        """
        self._j_builder.setClientIdPrefix(prefix)
        return self

    def set_property(self, key: str, value: str) -> 'KafkaSourceBuilder':
        """
        Set an arbitrary property for the KafkaSource and KafkaConsumer. The valid keys can be found
        in ConsumerConfig and KafkaSourceOptions.

        Note that the following keys will be overridden by the builder when the KafkaSource is
        created.

        * ``auto.offset.reset.strategy`` is overridden by AutoOffsetResetStrategy returned by
          :class:`KafkaOffsetsInitializer` for the starting offsets, which is by default
          :meth:`KafkaOffsetsInitializer.earliest`.
        * ``partition.discovery.interval.ms`` is overridden to -1 when :meth:`set_bounded` has been
          invoked.

        :param key: the key of the property.
        :param value: the value of the property.
        :return: this KafkaSourceBuilder.
        """
        self._j_builder.setProperty(key, value)
        return self

    def set_properties(self, props: Dict) -> 'KafkaSourceBuilder':
        """
        Set arbitrary properties for the KafkaSource and KafkaConsumer. The valid keys can be found
        in ConsumerConfig and KafkaSourceOptions.

        Note that the following keys will be overridden by the builder when the KafkaSource is
        created.

        * ``auto.offset.reset.strategy`` is overridden by AutoOffsetResetStrategy returned by
          :class:`KafkaOffsetsInitializer` for the starting offsets, which is by default
          :meth:`KafkaOffsetsInitializer.earliest`.
        * ``partition.discovery.interval.ms`` is overridden to -1 when :meth:`set_bounded` has been
          invoked.
        * ``client.id`` is overridden to "client.id.prefix-RANDOM_LONG", or "group.id-RANDOM_LONG"
          if the client id prefix is not set.

        :param props: the properties to set for the KafkaSource.
        :return: this KafkaSourceBuilder.
        """
        gateway = get_gateway()
        j_properties = gateway.jvm.java.util.Properties()
        for key, value in props.items():
            j_properties.setProperty(key, value)
        self._j_builder.setProperties(j_properties)
        return self


class KafkaTopicPartition(object):
    """
    Corresponding to Java ``org.apache.kafka.common.TopicPartition`` class.

    Example:
    ::

        >>> topic_partition = KafkaTopicPartition('TOPIC1', 0)

    .. versionadded:: 1.16.0
    """

    def __init__(self, topic: str, partition: int):
        self._topic = topic
        self._partition = partition

    def _to_j_topic_partition(self):
        jvm = get_gateway().jvm
        return jvm.org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition(
            self._topic, self._partition)

    def __eq__(self, other):
        if not isinstance(other, KafkaTopicPartition):
            return False
        return self._topic == other._topic and self._partition == other._partition

    def __hash__(self):
        return 31 * (31 + self._partition) + hash(self._topic)


class KafkaOffsetResetStrategy(Enum):
    """
    Corresponding to Java ``org.apache.kafka.client.consumer.OffsetResetStrategy`` class.

    .. versionadded:: 1.16.0
    """

    LATEST = 0
    EARLIEST = 1
    NONE = 2

    def _to_j_offset_reset_strategy(self):
        JOffsetResetStrategy = get_gateway().jvm.org.apache.flink.kafka.shaded.org.apache.kafka.\
            clients.consumer.OffsetResetStrategy
        return getattr(JOffsetResetStrategy, self.name)


class KafkaOffsetsInitializer(object):
    """
    An interface for users to specify the starting / stopping offset of a KafkaPartitionSplit.

    .. versionadded:: 1.16.0
    """

    def __init__(self, j_initializer: JavaObject):
        self._j_initializer = j_initializer

    @staticmethod
    def committed_offsets(
            offset_reset_strategy: 'KafkaOffsetResetStrategy' = KafkaOffsetResetStrategy.NONE) -> \
            'KafkaOffsetsInitializer':
        """
        Get an :class:`KafkaOffsetsInitializer` which initializes the offsets to the committed
        offsets. An exception will be thrown at runtime if there is no committed offsets.

        An optional :class:`KafkaOffsetResetStrategy` can be specified to initialize the offsets if
        the committed offsets does not exist.

        :param offset_reset_strategy: the offset reset strategy to use when the committed offsets do
            not exist.
        :return: an offset initializer which initialize the offsets to the committed offsets.
        """
        JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source.\
            enumerator.initializer.OffsetsInitializer
        return KafkaOffsetsInitializer(JOffsetsInitializer.committedOffsets(
            offset_reset_strategy._to_j_offset_reset_strategy()))

    @staticmethod
    def timestamp(timestamp: int) -> 'KafkaOffsetsInitializer':
        """
        Get an :class:`KafkaOffsetsInitializer` which initializes the offsets in each partition so
        that the initialized offset is the offset of the first record whose record timestamp is
        greater than or equals the give timestamp.

        :param timestamp: the timestamp to start the consumption.
        :return: an :class:`OffsetsInitializer` which initializes the offsets based on the given
            timestamp.
        """
        JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source. \
            enumerator.initializer.OffsetsInitializer
        return KafkaOffsetsInitializer(JOffsetsInitializer.timestamp(timestamp))

    @staticmethod
    def earliest() -> 'KafkaOffsetsInitializer':
        """
        Get an :class:`KafkaOffsetsInitializer` which initializes the offsets to the earliest
        available offsets of each partition.

        :return: an :class:`KafkaOffsetsInitializer` which initializes the offsets to the earliest
            available offsets.
        """
        JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source. \
            enumerator.initializer.OffsetsInitializer
        return KafkaOffsetsInitializer(JOffsetsInitializer.earliest())

    @staticmethod
    def latest() -> 'KafkaOffsetsInitializer':
        """
        Get an :class:`KafkaOffsetsInitializer` which initializes the offsets to the latest offsets
        of each partition.

        :return: an :class:`KafkaOffsetsInitializer` which initializes the offsets to the latest
            offsets.
        """
        JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source. \
            enumerator.initializer.OffsetsInitializer
        return KafkaOffsetsInitializer(JOffsetsInitializer.latest())

    @staticmethod
    def offsets(offsets: Dict['KafkaTopicPartition', int],
                offset_reset_strategy: 'KafkaOffsetResetStrategy' =
                KafkaOffsetResetStrategy.EARLIEST) -> 'KafkaOffsetsInitializer':
        """
        Get an :class:`KafkaOffsetsInitializer` which initializes the offsets to the specified
        offsets.

        An optional :class:`KafkaOffsetResetStrategy` can be specified to initialize the offsets in
        case the specified offset is out of range.

        Example:
        ::

            >>> KafkaOffsetsInitializer.offsets({
            ...     KafkaTopicPartition('TOPIC1', 0): 0,
            ...     KafkaTopicPartition('TOPIC1', 1): 10000
            ... }, KafkaOffsetResetStrategy.EARLIEST)

        :param offsets: the specified offsets for each partition.
        :param offset_reset_strategy: the :class:`KafkaOffsetResetStrategy` to use when the
            specified offset is out of range.
        :return: an :class:`KafkaOffsetsInitializer` which initializes the offsets to the specified
            offsets.
        """
        jvm = get_gateway().jvm
        j_map_wrapper = jvm.org.apache.flink.python.util.HashMapWrapper(
            None, get_java_class(jvm.Long))
        for tp, offset in offsets.items():
            j_map_wrapper.put(tp._to_j_topic_partition(), offset)

        JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source. \
            enumerator.initializer.OffsetsInitializer
        return KafkaOffsetsInitializer(JOffsetsInitializer.offsets(
            j_map_wrapper.asMap(), offset_reset_strategy._to_j_offset_reset_strategy()))


class KafkaSink(Sink, SupportsPreprocessing):
    """
    Flink Sink to produce data into a Kafka topic. The sink supports all delivery guarantees
    described by :class:`DeliveryGuarantee`.

    * :attr:`DeliveryGuarantee.NONE` does not provide any guarantees: messages may be lost in case
      of issues on the Kafka broker and messages may be duplicated in case of a Flink failure.
    * :attr:`DeliveryGuarantee.AT_LEAST_ONCE` the sink will wait for all outstanding records in the
      Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. No messages will be
      lost in case of any issue with the Kafka brokers but messages may be duplicated when Flink
      restarts.
    * :attr:`DeliveryGuarantee.EXACTLY_ONCE`: In this mode the KafkaSink will write all messages in
      a Kafka transaction that will be committed to Kafka on a checkpoint. Thus, if the consumer
      reads only committed data (see Kafka consumer config ``isolation.level``), no duplicates
      will be seen in case of a Flink restart. However, this delays record writing effectively
      until a checkpoint is written, so adjust the checkpoint duration accordingly. Please ensure
      that you use unique transactional id prefixes across your applications running on the same
      Kafka cluster such that multiple running jobs do not interfere in their transactions!
      Additionally, it is highly recommended to tweak Kafka transaction timeout (link) >> maximum
      checkpoint duration + maximum restart duration or data loss may happen when Kafka expires an
      uncommitted transaction.

    .. versionadded:: 1.16.0
    """

    def __init__(self, j_kafka_sink, transformer: Optional[StreamTransformer] = None):
        super().__init__(j_kafka_sink)
        self._transformer = transformer

    @staticmethod
    def builder() -> 'KafkaSinkBuilder':
        """
        Create a :class:`KafkaSinkBuilder` to construct :class:`KafkaSink`.
        """
        return KafkaSinkBuilder()

    def get_transformer(self) -> Optional[StreamTransformer]:
        return self._transformer


class KafkaSinkBuilder(object):
    """
    Builder to construct :class:`KafkaSink`.

    The following example shows the minimum setup to create a KafkaSink that writes String values
    to a Kafka topic.

    ::

        >>> record_serializer = KafkaRecordSerializationSchema.builder() \\
        ...     .set_topic(MY_SINK_TOPIC) \\
        ...     .set_value_serialization_schema(SimpleStringSchema()) \\
        ...     .build()
        >>> sink = KafkaSink.builder() \\
        ...     .set_bootstrap_servers(MY_BOOTSTRAP_SERVERS) \\
        ...     .set_record_serializer(record_serializer) \\
        ...     .build()

    One can also configure different :class:`DeliveryGuarantee` by using
    :meth:`set_delivery_guarantee` but keep in mind when using
    :attr:`DeliveryGuarantee.EXACTLY_ONCE`, one must set the transactional id prefix
    :meth:`set_transactional_id_prefix`.

    .. versionadded:: 1.16.0
    """

    def __init__(self):
        jvm = get_gateway().jvm
        self._j_builder = jvm.org.apache.flink.connector.kafka.sink.KafkaSink.builder()
        self._preprocessing = None

    def build(self) -> 'KafkaSink':
        """
        Constructs the :class:`KafkaSink` with the configured properties.
        """
        return KafkaSink(self._j_builder.build(), self._preprocessing)

    def set_bootstrap_servers(self, bootstrap_servers: str) -> 'KafkaSinkBuilder':
        """
        Sets the Kafka bootstrap servers.

        :param bootstrap_servers: A comma separated list of valid URIs to reach the Kafka broker.
        """
        self._j_builder.setBootstrapServers(bootstrap_servers)
        return self

    def set_delivery_guarantee(self, delivery_guarantee: DeliveryGuarantee) -> 'KafkaSinkBuilder':
        """
        Sets the wanted :class:`DeliveryGuarantee`. The default delivery guarantee is
        :attr:`DeliveryGuarantee.NONE`.

        :param delivery_guarantee: The wanted :class:`DeliveryGuarantee`.
        """
        self._j_builder.setDeliveryGuarantee(delivery_guarantee._to_j_delivery_guarantee())
        return self

    def set_transactional_id_prefix(self, transactional_id_prefix: str) -> 'KafkaSinkBuilder':
        """
        Sets the prefix for all created transactionalIds if :attr:`DeliveryGuarantee.EXACTLY_ONCE`
        is configured.

        It is mandatory to always set this value with :attr:`DeliveryGuarantee.EXACTLY_ONCE` to
        prevent corrupted transactions if multiple jobs using the KafkaSink run against the same
        Kafka Cluster. The default prefix is ``"kafka-sink"``.

        The size of the prefix is capped by MAXIMUM_PREFIX_BYTES (6400) formatted with UTF-8.

        It is important to keep the prefix stable across application restarts. If the prefix changes
        it might happen that lingering transactions are not correctly aborted and newly written
        messages are not immediately consumable until transactions timeout.

        :param transactional_id_prefix: The transactional id prefix.
        """
        self._j_builder.setTransactionalIdPrefix(transactional_id_prefix)
        return self

    def set_record_serializer(self, record_serializer: 'KafkaRecordSerializationSchema') \
            -> 'KafkaSinkBuilder':
        """
        Sets the :class:`KafkaRecordSerializationSchema` that transforms incoming records to kafka
        producer records.

        :param record_serializer: The :class:`KafkaRecordSerializationSchema`.
        """
        # NOTE: If topic selector is a generated first-column selector, do extra preprocessing
        j_topic_selector = get_field_value(record_serializer._j_serialization_schema,
                                           'topicSelector')
        if (
            j_topic_selector.getClass().getCanonicalName() ==
            'org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder.'
            'CachingTopicSelector'
        ) and (
            get_field_value(j_topic_selector, 'topicSelector').getClass().getCanonicalName()
            is not None and
            (get_field_value(j_topic_selector, 'topicSelector').getClass().getCanonicalName()
             .startswith('com.sun.proxy') or
             get_field_value(j_topic_selector, 'topicSelector').getClass().getCanonicalName()
             .startswith('jdk.proxy'))
        ):
            record_serializer._wrap_serialization_schema()
            self._preprocessing = record_serializer._build_preprocessing()

        self._j_builder.setRecordSerializer(record_serializer._j_serialization_schema)
        return self

    def set_property(self, key: str, value: str) -> 'KafkaSinkBuilder':
        """
        Sets kafka producer config.

        :param key: Kafka producer config key.
        :param value: Kafka producer config value.
        """
        self._j_builder.setProperty(key, value)
        return self


class KafkaRecordSerializationSchema(SerializationSchema):
    """
    A serialization schema which defines how to convert the stream record to kafka producer record.

    .. versionadded:: 1.16.0
    """

    def __init__(self, j_serialization_schema,
                 topic_selector: Optional['KafkaTopicSelector'] = None):
        super().__init__(j_serialization_schema)
        self._topic_selector = topic_selector

    @staticmethod
    def builder() -> 'KafkaRecordSerializationSchemaBuilder':
        """
        Creates a default schema builder to provide common building blocks i.e. key serialization,
        value serialization, topic selection.
        """
        return KafkaRecordSerializationSchemaBuilder()

    def _wrap_serialization_schema(self):
        jvm = get_gateway().jvm

        def _wrap_schema(field_name):
            j_schema_field = get_field(self._j_serialization_schema.getClass(), field_name)
            if j_schema_field.get(self._j_serialization_schema) is not None:
                j_schema_field.set(
                    self._j_serialization_schema,
                    jvm.org.apache.flink.python.util.PythonConnectorUtils
                    .SecondColumnSerializationSchema(
                        j_schema_field.get(self._j_serialization_schema)
                    )
                )

        _wrap_schema('keySerializationSchema')
        _wrap_schema('valueSerializationSchema')

    def _build_preprocessing(self) -> StreamTransformer:
        class SelectTopicTransformer(StreamTransformer):

            def __init__(self, topic_selector: KafkaTopicSelector):
                self._topic_selector = topic_selector

            def apply(self, ds):
                output_type = Types.ROW([Types.STRING(), ds.get_type()])
                return ds.map(lambda v: Row(self._topic_selector.apply(v), v),
                              output_type=output_type)

        return SelectTopicTransformer(self._topic_selector)


class KafkaRecordSerializationSchemaBuilder(object):
    """
    Builder to construct :class:`KafkaRecordSerializationSchema`.

    Example:
    ::

        >>> KafkaRecordSerializationSchema.builder() \\
        ...     .set_topic('topic') \\
        ...     .set_key_serialization_schema(SimpleStringSchema()) \\
        ...     .set_value_serialization_schema(SimpleStringSchema()) \\
        ...     .build()

    And the sink topic can be calculated dynamically from each record:
    ::

        >>> KafkaRecordSerializationSchema.builder() \\
        ...     .set_topic_selector(lambda row: 'topic-' + row['category']) \\
        ...     .set_value_serialization_schema(
        ...         JsonRowSerializationSchema.builder().with_type_info(ROW_TYPE).build()) \\
        ...     .build()

    It is necessary to configure exactly one serialization method for the value and a topic.

    .. versionadded:: 1.16.0
    """

    def __init__(self):
        jvm = get_gateway().jvm
        self._j_builder = jvm.org.apache.flink.connector.kafka.sink \
            .KafkaRecordSerializationSchemaBuilder()
        self._fixed_topic = True  # type: bool
        self._topic_selector = None  # type: Optional[KafkaTopicSelector]
        self._key_serialization_schema = None  # type: Optional[SerializationSchema]
        self._value_serialization_schema = None  # type: Optional[SerializationSchema]

    def build(self) -> 'KafkaRecordSerializationSchema':
        """
        Constructs the :class:`KafkaRecordSerializationSchemaBuilder` with the configured
        properties.
        """
        if self._fixed_topic:
            return KafkaRecordSerializationSchema(self._j_builder.build())
        else:
            return KafkaRecordSerializationSchema(self._j_builder.build(), self._topic_selector)

    def set_topic(self, topic: str) -> 'KafkaRecordSerializationSchemaBuilder':
        """
        Sets a fixed topic which used as destination for all records.

        :param topic: The fixed topic.
        """
        self._j_builder.setTopic(topic)
        self._fixed_topic = True
        return self

    def set_topic_selector(self, topic_selector: Union[Callable[[Any], str], 'KafkaTopicSelector'])\
            -> 'KafkaRecordSerializationSchemaBuilder':
        """
        Sets a topic selector which computes the target topic for every incoming record.

        :param topic_selector: A :class:`KafkaTopicSelector` implementation or a function that
            consumes each incoming record and return the topic string.
        """
        if not isinstance(topic_selector, KafkaTopicSelector) and not callable(topic_selector):
            raise TypeError('topic_selector must be KafkaTopicSelector or a callable')
        if not isinstance(topic_selector, KafkaTopicSelector):
            class TopicSelectorFunctionAdapter(KafkaTopicSelector):

                def __init__(self, f: Callable[[Any], str]):
                    self._f = f

                def apply(self, data) -> str:
                    return self._f(data)

            topic_selector = TopicSelectorFunctionAdapter(topic_selector)

        jvm = get_gateway().jvm
        self._j_builder.setTopicSelector(
            jvm.org.apache.flink.python.util.PythonConnectorUtils.createFirstColumnTopicSelector(
                get_java_class(jvm.org.apache.flink.connector.kafka.sink.TopicSelector)
            )
        )
        self._fixed_topic = False
        self._topic_selector = topic_selector
        return self

    def set_key_serialization_schema(self, key_serialization_schema: SerializationSchema) \
            -> 'KafkaRecordSerializationSchemaBuilder':
        """
        Sets a :class:`SerializationSchema` which is used to serialize the incoming element to the
        key of the producer record. The key serialization is optional, if not set, the key of the
        producer record will be null.

        :param key_serialization_schema: The :class:`SerializationSchema` to serialize each incoming
            record as the key of producer record.
        """
        self._key_serialization_schema = key_serialization_schema
        self._j_builder.setKeySerializationSchema(key_serialization_schema._j_serialization_schema)
        return self

    def set_value_serialization_schema(self, value_serialization_schema: SerializationSchema) \
            -> 'KafkaRecordSerializationSchemaBuilder':
        """
        Sets a :class:`SerializationSchema` which is used to serialize the incoming element to the
        value of the producer record. The value serialization is required.

        :param value_serialization_schema: The :class:`SerializationSchema` to serialize each data
            record as the key of producer record.
        """
        self._value_serialization_schema = value_serialization_schema
        self._j_builder.setValueSerializationSchema(
            value_serialization_schema._j_serialization_schema)
        return self


class KafkaTopicSelector(ABC):
    """
    Select topic for an incoming record

    .. versionadded:: 1.16.0
    """

    @abstractmethod
    def apply(self, data) -> str:
        pass
