in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java [808:855]
public void open(Configuration configuration) throws Exception {
if (logFailuresOnly) {
callback =
new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
LOG.error(
"Error while sending record to Kafka: " + e.getMessage(),
e);
}
acknowledgeMessage();
}
};
} else {
callback =
new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null && asyncException == null) {
asyncException = exception;
}
acknowledgeMessage();
}
};
}
RuntimeContext ctx = getRuntimeContext();
if (flinkKafkaPartitioner != null) {
flinkKafkaPartitioner.open(
ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
if (kafkaSchema instanceof KafkaContextAware) {
KafkaContextAware<IN> contextAwareSchema = (KafkaContextAware<IN>) kafkaSchema;
contextAwareSchema.setParallelInstanceId(ctx.getIndexOfThisSubtask());
contextAwareSchema.setNumParallelInstances(ctx.getNumberOfParallelSubtasks());
}
if (kafkaSchema != null) {
kafkaSchema.open(
RuntimeContextInitializationContextAdapters.serializationAdapter(
getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
}
super.open(configuration);
}