in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java [239:300]
public void open(Configuration parameters) throws Exception {
super.open(parameters);
schema.open(
RuntimeContextInitializationContextAdapters.serializationAdapter(
getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
// check and pass the configuration properties
KinesisProducerConfiguration producerConfig =
KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
producer = getKinesisProducer(producerConfig);
final MetricGroup kinesisMectricGroup =
getRuntimeContext().getMetricGroup().addGroup(KINESIS_PRODUCER_METRIC_GROUP);
this.backpressureCycles = kinesisMectricGroup.counter(METRIC_BACKPRESSURE_CYCLES);
kinesisMectricGroup.gauge(
METRIC_OUTSTANDING_RECORDS_COUNT, producer::getOutstandingRecordsCount);
backpressureLatch = new TimeoutLatch();
callback =
new FutureCallback<UserRecordResult>() {
@Override
public void onSuccess(UserRecordResult result) {
backpressureLatch.trigger();
if (!result.isSuccessful()) {
if (failOnError) {
// only remember the first thrown exception
if (thrownException == null) {
thrownException =
new RuntimeException("Record was not sent successful");
}
} else {
LOG.warn("Record was not sent successful");
}
}
}
@Override
public void onFailure(Throwable t) {
backpressureLatch.trigger();
if (failOnError) {
thrownException = t;
} else {
LOG.warn("An exception occurred while processing a record", t);
}
}
};
if (this.customPartitioner != null) {
this.customPartitioner.initialize(
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());
}
final RuntimeContext ctx = getRuntimeContext();
ctx.registerUserCodeClassLoaderReleaseHookIfAbsent(
KINESIS_PRODUCER_RELEASE_HOOK_NAME,
() -> this.runClassLoaderReleaseHook(ctx.getUserCodeClassLoader()));
LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion());
}