in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java [96:138]
public PulsarWriter(
SinkConfiguration sinkConfiguration,
PulsarSerializationSchema<IN> serializationSchema,
MetadataListener metadataListener,
TopicRouter<IN> topicRouter,
MessageDelayer<IN> messageDelayer,
PulsarCrypto pulsarCrypto,
InitContext initContext)
throws PulsarClientException {
checkNotNull(sinkConfiguration);
this.serializationSchema = checkNotNull(serializationSchema);
this.metadataListener = checkNotNull(metadataListener);
this.topicRouter = checkNotNull(topicRouter);
this.messageDelayer = checkNotNull(messageDelayer);
checkNotNull(initContext);
this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
this.sinkContext =
new PulsarSinkContextImpl(initContext, sinkConfiguration, metadataListener);
// Initialize topic metadata listener.
LOG.debug("Initialize topic metadata after creating Pulsar writer.");
ProcessingTimeService timeService = initContext.getProcessingTimeService();
this.metadataListener.open(sinkConfiguration, timeService);
// Initialize topic router.
this.topicRouter.open(sinkConfiguration);
// Initialize the serialization schema.
try {
InitializationContext initializationContext =
initContext.asSerializationSchemaInitializationContext();
this.serializationSchema.open(initializationContext, sinkContext, sinkConfiguration);
} catch (Exception e) {
throw new FlinkRuntimeException("Cannot initialize schema.", e);
}
// Create this producer register after opening serialization schema!
SinkWriterMetricGroup metricGroup = initContext.metricGroup();
this.producerRegister = new ProducerRegister(sinkConfiguration, pulsarCrypto, metricGroup);
this.mailboxExecutor = initContext.getMailboxExecutor();
this.pendingMessages = new AtomicLong(0);
}