public KafkaWriter()

in src/main/java/com/aliyun/odps/kafka/KafkaWriter.java [25:41]


  public KafkaWriter(MaxComputeSinkConnectorConfig config) {
    props = new Properties();

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getString(
        MaxComputeSinkConnectorConfig.BaseParameter.RUNTIME_ERROR_TOPIC_BOOTSTRAP_SERVERS.getName()));
    //Kafka消息的序列化方式,这里先默认 String
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer");
    //请求的最长等待时间
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
    topic =
        config.getString(
            MaxComputeSinkConnectorConfig.BaseParameter.RUNTIME_ERROR_TOPIC_NAME.getName());
    producer = new KafkaProducer<String, String>(props);
  }