in pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java [68:127]
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
Map<String, String> stringConfig = new HashMap<>();
config.forEach((key, value) -> {
if (value instanceof String) {
stringConfig.put(key, (String) value);
}
});
// get the source class name from config and create source task from reflection
sourceTask = ((Class<? extends SourceTask>) Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG)))
.asSubclass(SourceTask.class)
.getDeclaredConstructor()
.newInstance();
topicNamespace = stringConfig.get(TOPIC_NAMESPACE_CONFIG);
// initialize the key and value converter
keyConverter = ((Class<? extends Converter>) Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG)))
.asSubclass(Converter.class)
.getDeclaredConstructor()
.newInstance();
valueConverter = ((Class<? extends Converter>) Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG)))
.asSubclass(Converter.class)
.getDeclaredConstructor()
.newInstance();
if (keyConverter instanceof AvroConverter) {
keyConverter = new AvroConverter(new MockSchemaRegistryClient());
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock");
}
if (valueConverter instanceof AvroConverter) {
valueConverter = new AvroConverter(new MockSchemaRegistryClient());
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock");
}
keyConverter.configure(config, true);
valueConverter.configure(config, false);
offsetStore = new PulsarOffsetBackingStore();
PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new PulsarKafkaWorkerConfig(stringConfig);
offsetStore.configure(pulsarKafkaWorkerConfig);
offsetStore.start();
offsetReader = new OffsetStorageReaderImpl(
offsetStore,
"pulsar-kafka-connect-adaptor",
keyConverter,
valueConverter
);
offsetWriter = new OffsetStorageWriter(
offsetStore,
"pulsar-kafka-connect-adaptor",
keyConverter,
valueConverter
);
sourceTaskContext = new PulsarIOSourceTaskContext(offsetReader, pulsarKafkaWorkerConfig);
sourceTask.initialize(sourceTaskContext);
sourceTask.start(stringConfig);
}