in src/main/java/com/aliyun/odps/kafka/KafkaWriter.java [25:41]
public KafkaWriter(MaxComputeSinkConnectorConfig config) {
props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getString(
MaxComputeSinkConnectorConfig.BaseParameter.RUNTIME_ERROR_TOPIC_BOOTSTRAP_SERVERS.getName()));
//Kafka消息的序列化方式,这里先默认 String
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
//请求的最长等待时间
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
topic =
config.getString(
MaxComputeSinkConnectorConfig.BaseParameter.RUNTIME_ERROR_TOPIC_NAME.getName());
producer = new KafkaProducer<String, String>(props);
}