public void open()

in pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java [68:127]


    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        Map<String, String> stringConfig = new HashMap<>();
        config.forEach((key, value) -> {
            if (value instanceof String) {
                stringConfig.put(key, (String) value);
            }
        });

        // get the source class name from config and create source task from reflection
        sourceTask = ((Class<? extends SourceTask>) Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG)))
                .asSubclass(SourceTask.class)
                .getDeclaredConstructor()
                .newInstance();

        topicNamespace = stringConfig.get(TOPIC_NAMESPACE_CONFIG);

        // initialize the key and value converter
        keyConverter = ((Class<? extends Converter>) Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG)))
                .asSubclass(Converter.class)
                .getDeclaredConstructor()
                .newInstance();
        valueConverter = ((Class<? extends Converter>) Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG)))
                .asSubclass(Converter.class)
                .getDeclaredConstructor()
                .newInstance();

        if (keyConverter instanceof AvroConverter) {
            keyConverter = new AvroConverter(new MockSchemaRegistryClient());
            config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock");
        }
        if (valueConverter instanceof AvroConverter) {
            valueConverter = new AvroConverter(new MockSchemaRegistryClient());
            config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock");
        }
        keyConverter.configure(config, true);
        valueConverter.configure(config, false);

        offsetStore = new PulsarOffsetBackingStore();
        PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new PulsarKafkaWorkerConfig(stringConfig);
        offsetStore.configure(pulsarKafkaWorkerConfig);
        offsetStore.start();

        offsetReader = new OffsetStorageReaderImpl(
                offsetStore,
                "pulsar-kafka-connect-adaptor",
                keyConverter,
                valueConverter
        );
        offsetWriter = new OffsetStorageWriter(
                offsetStore,
                "pulsar-kafka-connect-adaptor",
                keyConverter,
                valueConverter
        );

        sourceTaskContext = new PulsarIOSourceTaskContext(offsetReader, pulsarKafkaWorkerConfig);

        sourceTask.initialize(sourceTaskContext);
        sourceTask.start(stringConfig);
    }