in src/main/java/com/aliyun/emr/example/storm/benchmark/BasicTopology.java [72:102]
private IRichBolt getKafkaBolt() {
Properties properties = new Properties();
properties.put("bootstrap.servers", configure.getProperty("result.broker.list"));
properties.put("acks", "0");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// consume too much memory
//properties.put("batch.size", "10485760");
//properties.put("max.request", "10485760");
//properties.put("send.buffer.bytes", "1000000");
KafkaBolt bolt = new KafkaBolt<String, String>()
.withProducerProperties(properties)
.withTopicSelector(new DefaultTopicSelector(configure.getProperty("result.topic")))
.withTupleToKafkaMapper(new TupleToKafkaMapper<String, String>() {
@Override
public String getKeyFromTuple(Tuple tuple) {
return null;
}
@Override
public String getMessageFromTuple(Tuple tuple) {
ImmutableMap<String, String> kv = (ImmutableMap<String, String>)tuple.getValue(0);
return kv.keySet().iterator().next() + "," + System.currentTimeMillis();
}
});
bolt.setFireAndForget(true);
bolt.setAsync(true);
return bolt;
}