public static void mergeSourceKafkaProperties()

in src/main/java/com/aliyun/dts/subscribe/clients/common/Util.java [33:56]


    public static void mergeSourceKafkaProperties(Properties originProperties, Properties mergeToProperties) {
        originProperties.forEach((k, v) ->{
            String key = (String)k;
            if (key.startsWith("kafka.")) {
                String toPutKey = key.substring(6);
                mergeToProperties.setProperty(toPutKey, (String)v);
            }
        });
        mergeToProperties.setProperty(SaslConfigs.SASL_JAAS_CONFIG,
                buildJaasConfig(originProperties.getProperty(SID_NAME), originProperties.getProperty(USER_NAME), originProperties.getProperty(PASSWORD_NAME)));
        mergeToProperties.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
        mergeToProperties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        mergeToProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, originProperties.getProperty(KAFKA_BROKER_URL_NAME));
        mergeToProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, originProperties.getProperty(GROUP_NAME));
        // disable auto commit
        mergeToProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        mergeToProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        mergeToProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        // to let the consumer feel the switch of cluster and reseek the offset by timestamp
        mergeToProperties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ClusterSwitchListener.class.getName());

        mergeToProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
    }