in src/main/java/com/aliyun/dts/subscribe/clients/common/Util.java [33:56]
public static void mergeSourceKafkaProperties(Properties originProperties, Properties mergeToProperties) {
originProperties.forEach((k, v) ->{
String key = (String)k;
if (key.startsWith("kafka.")) {
String toPutKey = key.substring(6);
mergeToProperties.setProperty(toPutKey, (String)v);
}
});
mergeToProperties.setProperty(SaslConfigs.SASL_JAAS_CONFIG,
buildJaasConfig(originProperties.getProperty(SID_NAME), originProperties.getProperty(USER_NAME), originProperties.getProperty(PASSWORD_NAME)));
mergeToProperties.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
mergeToProperties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
mergeToProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, originProperties.getProperty(KAFKA_BROKER_URL_NAME));
mergeToProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, originProperties.getProperty(GROUP_NAME));
// disable auto commit
mergeToProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
mergeToProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
mergeToProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
// to let the consumer feel the switch of cluster and reseek the offset by timestamp
mergeToProperties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ClusterSwitchListener.class.getName());
mergeToProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
}