public void invoke()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java [858:927]


    public void invoke(
            FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context)
            throws FlinkKafkaException {
        checkErroneous();

        ProducerRecord<byte[], byte[]> record;
        if (keyedSchema != null) {
            byte[] serializedKey = keyedSchema.serializeKey(next);
            byte[] serializedValue = keyedSchema.serializeValue(next);
            String targetTopic = keyedSchema.getTargetTopic(next);
            if (targetTopic == null) {
                targetTopic = defaultTopicId;
            }

            Long timestamp = null;
            if (this.writeTimestampToKafka) {
                timestamp = context.timestamp();
            }

            int[] partitions = topicPartitionsMap.get(targetTopic);
            if (null == partitions) {
                partitions = getPartitionsByTopic(targetTopic, transaction.producer);
                topicPartitionsMap.put(targetTopic, partitions);
            }
            if (flinkKafkaPartitioner != null) {
                record =
                        new ProducerRecord<>(
                                targetTopic,
                                flinkKafkaPartitioner.partition(
                                        next,
                                        serializedKey,
                                        serializedValue,
                                        targetTopic,
                                        partitions),
                                timestamp,
                                serializedKey,
                                serializedValue);
            } else {
                record =
                        new ProducerRecord<>(
                                targetTopic, null, timestamp, serializedKey, serializedValue);
            }
        } else if (kafkaSchema != null) {
            if (kafkaSchema instanceof KafkaContextAware) {
                @SuppressWarnings("unchecked")
                KafkaContextAware<IN> contextAwareSchema = (KafkaContextAware<IN>) kafkaSchema;

                String targetTopic = contextAwareSchema.getTargetTopic(next);
                if (targetTopic == null) {
                    targetTopic = defaultTopicId;
                }
                int[] partitions = topicPartitionsMap.get(targetTopic);

                if (null == partitions) {
                    partitions = getPartitionsByTopic(targetTopic, transaction.producer);
                    topicPartitionsMap.put(targetTopic, partitions);
                }

                contextAwareSchema.setPartitions(partitions);
            }
            record = kafkaSchema.serialize(next, context.timestamp());
        } else {
            throw new RuntimeException(
                    "We have neither KafkaSerializationSchema nor KeyedSerializationSchema, this"
                            + "is a bug.");
        }

        pendingRecords.incrementAndGet();
        transaction.producer.send(record, callback);
    }