in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java [78:107]
public void open(Configuration parameters) throws Exception {
Validate.notEmpty(props, "Producer properties can not be empty");
// with authentication hook
producer = new DefaultMQProducer(RocketMQConfig.buildAclRPCHook(props));
producer.setInstanceName(
getRuntimeContext().getIndexOfThisSubtask() + "_" + UUID.randomUUID());
RocketMQConfig.buildProducerConfigs(props, producer);
batchList = new LinkedList<>();
if (batchFlushOnCheckpoint
&& !((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) {
LOG.info(
"Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
batchFlushOnCheckpoint = false;
}
try {
producer.start();
} catch (MQClientException e) {
LOG.error("Flink sink init failed, due to the producer cannot be initialized.");
throw new RuntimeException(e);
}
sinkInTps = MetricUtils.registerSinkInTps(getRuntimeContext());
outTps = MetricUtils.registerOutTps(getRuntimeContext());
outBps = MetricUtils.registerOutBps(getRuntimeContext());
latencyGauge = MetricUtils.registerOutLatency(getRuntimeContext());
}