public void open()

in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java [78:107]


    public void open(Configuration parameters) throws Exception {
        Validate.notEmpty(props, "Producer properties can not be empty");

        // with authentication hook
        producer = new DefaultMQProducer(RocketMQConfig.buildAclRPCHook(props));
        producer.setInstanceName(
                getRuntimeContext().getIndexOfThisSubtask() + "_" + UUID.randomUUID());

        RocketMQConfig.buildProducerConfigs(props, producer);

        batchList = new LinkedList<>();

        if (batchFlushOnCheckpoint
                && !((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) {
            LOG.info(
                    "Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
            batchFlushOnCheckpoint = false;
        }

        try {
            producer.start();
        } catch (MQClientException e) {
            LOG.error("Flink sink init failed, due to the producer cannot be initialized.");
            throw new RuntimeException(e);
        }
        sinkInTps = MetricUtils.registerSinkInTps(getRuntimeContext());
        outTps = MetricUtils.registerOutTps(getRuntimeContext());
        outBps = MetricUtils.registerOutBps(getRuntimeContext());
        latencyGauge = MetricUtils.registerOutLatency(getRuntimeContext());
    }