in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java [218:291]
public void open(Configuration configuration) throws Exception {
if (schema instanceof KeyedSerializationSchemaWrapper) {
((KeyedSerializationSchemaWrapper<IN>) schema)
.getSerializationSchema()
.open(
RuntimeContextInitializationContextAdapters.serializationAdapter(
getRuntimeContext(),
metricGroup -> metricGroup.addGroup("user")));
}
producer = getKafkaProducer(this.producerConfig);
RuntimeContext ctx = getRuntimeContext();
if (null != flinkKafkaPartitioner) {
flinkKafkaPartitioner.open(
ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
LOG.info(
"Starting FlinkKafkaProducer ({}/{}) to produce into default topic {}",
ctx.getIndexOfThisSubtask() + 1,
ctx.getNumberOfParallelSubtasks(),
defaultTopicId);
// register Kafka metrics to Flink accumulators
if (!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
Map<MetricName, ? extends Metric> metrics = this.producer.metrics();
if (metrics == null) {
// MapR's Kafka implementation returns null here.
LOG.info("Producer implementation does not support metrics");
} else {
final MetricGroup kafkaMetricGroup =
getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
kafkaMetricGroup.gauge(
metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
}
}
}
if (flushOnCheckpoint
&& !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
LOG.warn(
"Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
flushOnCheckpoint = false;
}
if (logFailuresOnly) {
callback =
new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
LOG.error(
"Error while sending record to Kafka: " + e.getMessage(),
e);
}
acknowledgeMessage();
}
};
} else {
callback =
new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null && asyncException == null) {
asyncException = exception;
}
acknowledgeMessage();
}
};
}
}