public PulsarWriter()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java [96:138]


    public PulsarWriter(
            SinkConfiguration sinkConfiguration,
            PulsarSerializationSchema<IN> serializationSchema,
            MetadataListener metadataListener,
            TopicRouter<IN> topicRouter,
            MessageDelayer<IN> messageDelayer,
            PulsarCrypto pulsarCrypto,
            InitContext initContext)
            throws PulsarClientException {
        checkNotNull(sinkConfiguration);
        this.serializationSchema = checkNotNull(serializationSchema);
        this.metadataListener = checkNotNull(metadataListener);
        this.topicRouter = checkNotNull(topicRouter);
        this.messageDelayer = checkNotNull(messageDelayer);
        checkNotNull(initContext);

        this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
        this.sinkContext =
                new PulsarSinkContextImpl(initContext, sinkConfiguration, metadataListener);

        // Initialize topic metadata listener.
        LOG.debug("Initialize topic metadata after creating Pulsar writer.");
        ProcessingTimeService timeService = initContext.getProcessingTimeService();
        this.metadataListener.open(sinkConfiguration, timeService);

        // Initialize topic router.
        this.topicRouter.open(sinkConfiguration);

        // Initialize the serialization schema.
        try {
            InitializationContext initializationContext =
                    initContext.asSerializationSchemaInitializationContext();
            this.serializationSchema.open(initializationContext, sinkContext, sinkConfiguration);
        } catch (Exception e) {
            throw new FlinkRuntimeException("Cannot initialize schema.", e);
        }

        // Create this producer register after opening serialization schema!
        SinkWriterMetricGroup metricGroup = initContext.metricGroup();
        this.producerRegister = new ProducerRegister(sinkConfiguration, pulsarCrypto, metricGroup);
        this.mailboxExecutor = initContext.getMailboxExecutor();
        this.pendingMessages = new AtomicLong(0);
    }