in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/FlinkKinesisProducer.java [215:262]
public void open(Configuration parameters) throws Exception {
super.open(parameters);
schema.open(() -> getRuntimeContext().getMetricGroup().addGroup("user"));
// check and pass the configuration properties
KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
producer = getKinesisProducer(producerConfig);
final MetricGroup kinesisMectricGroup = getRuntimeContext().getMetricGroup().addGroup(KINESIS_PRODUCER_METRIC_GROUP);
this.backpressureCycles = kinesisMectricGroup.counter(METRIC_BACKPRESSURE_CYCLES);
kinesisMectricGroup.gauge(METRIC_OUTSTANDING_RECORDS_COUNT, producer::getOutstandingRecordsCount);
backpressureLatch = new TimeoutLatch();
callback = new FutureCallback<UserRecordResult>() {
@Override
public void onSuccess(UserRecordResult result) {
backpressureLatch.trigger();
if (!result.isSuccessful()) {
if (failOnError) {
// only remember the first thrown exception
if (thrownException == null) {
thrownException = new RuntimeException("Record was not sent successful");
}
} else {
LOG.warn("Record was not sent successful");
}
}
}
@Override
public void onFailure(Throwable t) {
backpressureLatch.trigger();
if (failOnError) {
thrownException = t;
} else {
LOG.warn("An exception occurred while processing a record", t);
}
}
};
if (this.customPartitioner != null) {
this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
}
LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion());
}