in src/main/java/com/aliyun/emr/example/storm/StormKafkaSample.java [46:98]
public static void main(String[] args) throws AuthorizationException {
String topic = args[0] ;
String zk = args[1];
String hdfsUrl = args[2];
ZkHosts zkHosts = new ZkHosts(zk + ":2181/kafka-1.0.0");
SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/kafka-1.0.0", "MyTrack") ;
List<String> zkServers = new ArrayList<String>() ;
zkServers.add(zk);
spoutConfig.zkServers = zkServers;
spoutConfig.zkPort = 2181;
spoutConfig.socketTimeoutMs = 60 * 1000 ;
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()) ;
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/foo/");
HdfsBolt bolt = new HdfsBolt()
.withFsUrl(hdfsUrl)
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
TopologyBuilder builder = new TopologyBuilder() ;
builder.setSpout("spout", new KafkaSpout(spoutConfig) ,2) ;
builder.setBolt("bolt", bolt, 1).shuffleGrouping("spout") ;
Config conf = new Config ();
conf.setDebug(false) ;
if (args.length > 3) {
try {
StormSubmitter.submitTopology(args[3], conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
} else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mytopology", conf, builder.createTopology());
}
}