public void open()

in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java [239:300]


    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        schema.open(
                RuntimeContextInitializationContextAdapters.serializationAdapter(
                        getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));

        // check and pass the configuration properties
        KinesisProducerConfiguration producerConfig =
                KinesisConfigUtil.getValidatedProducerConfiguration(configProps);

        producer = getKinesisProducer(producerConfig);

        final MetricGroup kinesisMectricGroup =
                getRuntimeContext().getMetricGroup().addGroup(KINESIS_PRODUCER_METRIC_GROUP);
        this.backpressureCycles = kinesisMectricGroup.counter(METRIC_BACKPRESSURE_CYCLES);
        kinesisMectricGroup.gauge(
                METRIC_OUTSTANDING_RECORDS_COUNT, producer::getOutstandingRecordsCount);

        backpressureLatch = new TimeoutLatch();
        callback =
                new FutureCallback<UserRecordResult>() {
                    @Override
                    public void onSuccess(UserRecordResult result) {
                        backpressureLatch.trigger();
                        if (!result.isSuccessful()) {
                            if (failOnError) {
                                // only remember the first thrown exception
                                if (thrownException == null) {
                                    thrownException =
                                            new RuntimeException("Record was not sent successful");
                                }
                            } else {
                                LOG.warn("Record was not sent successful");
                            }
                        }
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        backpressureLatch.trigger();
                        if (failOnError) {
                            thrownException = t;
                        } else {
                            LOG.warn("An exception occurred while processing a record", t);
                        }
                    }
                };

        if (this.customPartitioner != null) {
            this.customPartitioner.initialize(
                    getRuntimeContext().getIndexOfThisSubtask(),
                    getRuntimeContext().getNumberOfParallelSubtasks());
        }

        final RuntimeContext ctx = getRuntimeContext();
        ctx.registerUserCodeClassLoaderReleaseHookIfAbsent(
                KINESIS_PRODUCER_RELEASE_HOOK_NAME,
                () -> this.runClassLoaderReleaseHook(ctx.getUserCodeClassLoader()));

        LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion());
    }