public static void main()

in src/main/java/com/aliyun/emr/example/storm/StormKafkaSample.java [46:98]


    public static void main(String[] args) throws AuthorizationException {
        String topic = args[0] ;
        String zk = args[1];
        String hdfsUrl = args[2];
        ZkHosts zkHosts = new ZkHosts(zk + ":2181/kafka-1.0.0");
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/kafka-1.0.0", "MyTrack") ;
        List<String> zkServers = new ArrayList<String>() ;
        zkServers.add(zk);
        spoutConfig.zkServers = zkServers;
        spoutConfig.zkPort = 2181;
        spoutConfig.socketTimeoutMs = 60 * 1000 ;
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()) ;

        // use "|" instead of "," for field delimiter
        RecordFormat format = new DelimitedRecordFormat()
                .withFieldDelimiter("|");

        // sync the filesystem after every 1k tuples
        SyncPolicy syncPolicy = new CountSyncPolicy(1000);

        // rotate files when they reach 5MB
        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);

        FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/foo/");

        HdfsBolt bolt = new HdfsBolt()
                .withFsUrl(hdfsUrl)
                .withFileNameFormat(fileNameFormat)
                .withRecordFormat(format)
                .withRotationPolicy(rotationPolicy)
                .withSyncPolicy(syncPolicy);

        TopologyBuilder builder = new TopologyBuilder() ;
        builder.setSpout("spout", new KafkaSpout(spoutConfig) ,2) ;
        builder.setBolt("bolt", bolt, 1).shuffleGrouping("spout") ;

        Config conf = new Config ();
        conf.setDebug(false) ;

        if (args.length > 3) {
            try {
                StormSubmitter.submitTopology(args[3], conf, builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        } else {
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("mytopology", conf, builder.createTopology());
        }

    }