private IRichBolt getKafkaBolt()

in src/main/java/com/aliyun/emr/example/storm/benchmark/BasicTopology.java [72:102]


    private IRichBolt getKafkaBolt() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", configure.getProperty("result.broker.list"));
        properties.put("acks", "0");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // consume too much memory
        //properties.put("batch.size", "10485760");
        //properties.put("max.request", "10485760");
        //properties.put("send.buffer.bytes", "1000000");
        KafkaBolt bolt = new KafkaBolt<String, String>()
            .withProducerProperties(properties)
            .withTopicSelector(new DefaultTopicSelector(configure.getProperty("result.topic")))
            .withTupleToKafkaMapper(new TupleToKafkaMapper<String, String>() {
                @Override
                public String getKeyFromTuple(Tuple tuple) {
                    return null;
                }

                @Override
                public String getMessageFromTuple(Tuple tuple) {

                    ImmutableMap<String, String> kv = (ImmutableMap<String, String>)tuple.getValue(0);
                    return kv.keySet().iterator().next() + "," + System.currentTimeMillis();

                }
            });
        bolt.setFireAndForget(true);
        bolt.setAsync(true);
        return bolt;
    }