public static void main()

in src/main/java/com/aliyun/emr/example/spark/streaming/JavaLoghubWordCount.java [40:95]


  public static void main(String[] args) throws InterruptedException {
    if (args.length < 6) {
      System.err.println("Usage: bin/spark-submit --class JavaLoghubWordCount " +
          "examples-1.0-SNAPSHOT-shaded.jar <sls project> <sls logstore> <loghub group name> " +
          "<sls endpoint> <access key id> <access key secret>");
      System.exit(1);
    }

    String loghubProject = args[0];
    String logStore = args[1];
    String loghubGroupName = args[2];
    String endpoint = args[3];
    String accessKeyId = args[4];
    String accessKeySecret = args[5];

    SparkConf conf = new SparkConf().setAppName("Loghub Sample");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(2000));
    JavaDStream<byte[]> lines = LoghubUtils.createStream(
      jssc,
      loghubProject,
      logStore,
      loghubGroupName,
      endpoint,
      1,
      accessKeyId,
      accessKeySecret,
      StorageLevel.MEMORY_AND_DISK());

    JavaDStream<String> words = lines.map(new Function<byte[], String>() {
      @Override
      public String call(byte[] v1) throws Exception {
        return new String(v1);
      }
    }).flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterator<String> call(String s) {
        return Arrays.asList(SPACE.split(s)).iterator();
      }
    });
    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<String, Integer>(s, 1);
        }
      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
      }
    });

    wordCounts.print();
    jssc.start();
    jssc.awaitTermination();
  }