public void open()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/FlinkKinesisProducer.java [215:262]


	public void open(Configuration parameters) throws Exception {
		super.open(parameters);

		schema.open(() -> getRuntimeContext().getMetricGroup().addGroup("user"));

		// check and pass the configuration properties
		KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);

		producer = getKinesisProducer(producerConfig);

		final MetricGroup kinesisMectricGroup = getRuntimeContext().getMetricGroup().addGroup(KINESIS_PRODUCER_METRIC_GROUP);
		this.backpressureCycles = kinesisMectricGroup.counter(METRIC_BACKPRESSURE_CYCLES);
		kinesisMectricGroup.gauge(METRIC_OUTSTANDING_RECORDS_COUNT, producer::getOutstandingRecordsCount);

		backpressureLatch = new TimeoutLatch();
		callback = new FutureCallback<UserRecordResult>() {
			@Override
			public void onSuccess(UserRecordResult result) {
				backpressureLatch.trigger();
				if (!result.isSuccessful()) {
					if (failOnError) {
						// only remember the first thrown exception
						if (thrownException == null) {
							thrownException = new RuntimeException("Record was not sent successful");
						}
					} else {
						LOG.warn("Record was not sent successful");
					}
				}
			}

			@Override
			public void onFailure(Throwable t) {
				backpressureLatch.trigger();
				if (failOnError) {
					thrownException = t;
				} else {
					LOG.warn("An exception occurred while processing a record", t);
				}
			}
		};

		if (this.customPartitioner != null) {
			this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
		}

		LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion());
	}