public void invoke()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java [299:334]


    public void invoke(IN next, Context context) throws Exception {
        // propagate asynchronous errors
        checkErroneous();

        byte[] serializedKey = schema.serializeKey(next);
        byte[] serializedValue = schema.serializeValue(next);
        String targetTopic = schema.getTargetTopic(next);
        if (targetTopic == null) {
            targetTopic = defaultTopicId;
        }

        int[] partitions = this.topicPartitionsMap.get(targetTopic);
        if (null == partitions) {
            partitions = getPartitionsByTopic(targetTopic, producer);
            this.topicPartitionsMap.put(targetTopic, partitions);
        }

        ProducerRecord<byte[], byte[]> record;
        if (flinkKafkaPartitioner == null) {
            record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);
        } else {
            record =
                    new ProducerRecord<>(
                            targetTopic,
                            flinkKafkaPartitioner.partition(
                                    next, serializedKey, serializedValue, targetTopic, partitions),
                            serializedKey,
                            serializedValue);
        }
        if (flushOnCheckpoint) {
            synchronized (pendingRecordsLock) {
                pendingRecords++;
            }
        }
        producer.send(record, callback);
    }