public static void main()

in src/main/java/com/aliyun/emr/example/spark/streaming/SparkMNSJavaDemo.java [38:86]


    public static void main(String[] args) throws InterruptedException {
        if (args.length < 4) {
            System.err.println("Usage: bin/spark-submit --class SparkMNSJavaDemo examples-1.0-SNAPSHOT-shaded.jar <queueName> " +
                    "<accessKeyId> <accessKeySecret> <endpoint>");
            System.exit(1);
        }

        String queueName = args[0];
        String accessKeyId = args[1];
        String accessKeySecret = args[2];
        String endpoint = args[3];

        SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 8-2: Spark MNS Demo (Java)").setMaster("local[4]");
        sparkConf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem");
        sparkConf.set("spark.hadoop.mapreduce.job.run-local", "true");
        // Create the context with 2 seconds batch size
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

        JavaReceiverInputDStream<byte[]> lines = MnsUtils.createPullingStreamAsBytes(jssc, queueName, accessKeyId,
                accessKeySecret, endpoint, StorageLevel.MEMORY_AND_DISK());

        JavaDStream<String> words = lines.map(new Function<byte[], String>() {
            @Override
            public String call(byte[] v1) throws Exception {
                return new String(v1);
            }
        }).flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String x) {
                return Lists.newArrayList(SPACE.split(x)).iterator();
            }
        });
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });

        wordCounts.print();
        jssc.start();
        jssc.awaitTermination();
    }