public static void main()

in processor/src/main/java/com/google/cloud/solutions/realtimedash/pipeline/MetricsCalculationPipeline.java [43:196]


  public static void main(String[] args) {
    MetricsPipelineOptions options = extractPipelineOptions(args);
    Pipeline pipeline = Pipeline.create(options);

    PCollection<LogEvent> parsedLoggedEvents =
        pipeline
            .apply("Read PubSub Events",
                PubsubIO.readStrings().fromTopic(options.getInputTopic()))
            .apply("Parse Message JSON",
                ParDo.of(new ParseMessageAsLogElement()));

    RedisIO.Write redisWriter =
        RedisIO.write()
            .withEndpoint(
                options.getRedisHost(), options.getRedisPort());

    //visit counter
    parsedLoggedEvents
        .apply("Count visits per minute", ParDo.of(
            new DoFn<LogEvent, KV<String, String>>() {
              @ProcessElement
              public void countSession(ProcessContext context) {
                LogEvent event = context.element();
                context.output(
                    KV.of(event.getTimestamp().toString(timeBasedKeyBuilder("visitCounter")), "1"));
              }
            }
        ))
        .apply("Update Visit counter", redisWriter.withMethod(Method.INCRBY));

    // Build user - experiment/variant metric
    parsedLoggedEvents
        .apply("extract user for experiment-variant per minute metric",
            ParDo.of(new DoFn<LogEvent, KV<String, String>>() {

              @ProcessElement
              public void extractExperimentVariantPerTime(ProcessContext context) {
                LogEvent event = context.element();

                String key = event.getTimestamp().toString(timeBasedKeyBuilder(
                    "evcounter_e_" + event.getExperimentId() + "_v_" + event.getVariant()));
                context.output(KV.of(key, event.getUid()));
              }
            }))
        .apply("Update EV Counters", redisWriter.withMethod(Method.PFADD));

    // Variant based metrics
    PCollection<Pair<String, String>> variants =
        parsedLoggedEvents
            .apply("Extract Users per Variant",
                ParDo.of(
                    new DoFn<LogEvent, Pair<String, String>>() {
                      @ProcessElement
                      public void extractVariantAndUser(ProcessContext context) {
                        LogEvent elem = context.element();
                        context.output(Pair.of("" + elem.getVariant(), elem.getUid()));
                      }
                    }));
    variants
        .apply("Build HLL Keys (Variant)", hllKeyGenerator("var"))
        .apply("Add user to Variant HLL", redisWriter.withMethod(Method.PFADD));

    variants
        .apply("Build Set Keys (Variant)", setKeyGenerator("var"))
        .apply("Add user to Variant set", redisWriter.withMethod(Method.SADD));

    // Experiment based metrics
    PCollection<Pair<String, String>> experiments =
        parsedLoggedEvents
            .apply("Extract Users per Experiment",
                ParDo.of(
                    new DoFn<LogEvent, Pair<String, String>>() {
                      @ProcessElement
                      public void extractExperimentKvPair(ProcessContext context) {
                        LogEvent elem = context.element();
                        context.output(Pair.of(elem.getExperimentId(), elem.getUid()));
                      }
                    }
                ));

    experiments
        .apply("Build HLL Keys (Experiment)", hllKeyGenerator("exp"))
        .apply("Add User to Experiment HLL", redisWriter.withMethod(Method.PFADD));

    experiments
        .apply("Build Set Keys (Experiment)", setKeyGenerator("exp"))
        .apply("Add user to Experiment set", redisWriter.withMethod(Method.SADD));

    PCollection<Pair<String, String>> activeExperiments =
        parsedLoggedEvents
            .apply("Build Time bound experiments",
                ParDo.of(new DoFn<LogEvent, Pair<String, String>>() {

                  @ProcessElement
                  public void extractExperimentForTime(ProcessContext context) {
                    LogEvent event = context.element();

                    context.output(Pair.of(
                        event.getTimestamp().toString(
                            timeBasedKeyBuilder("experiments")),
                        event.getExperimentId()));

                  }
                }));

    activeExperiments
        .apply("build experiments hll key", hllKeyGenerator("experiments"))
        .apply("Write Active experiments data",
            redisWriter.withMethod(Method.PFADD));

    activeExperiments
        .apply("build experiments set key", setKeyGenerator("experiments"))
        .apply("Write Active experiments count", redisWriter.withMethod(Method.SADD));

    PCollection<Pair<String, String>> activeVariants =
        parsedLoggedEvents
            .apply("Build Time bound Variants",
                ParDo.of(new DoFn<LogEvent, Pair<String, String>>() {

                  @ProcessElement
                  public void extractExperimentForTime(ProcessContext context) {
                    LogEvent event = context.element();

                    context.output(Pair.of(
                        event.getTimestamp().toString(
                            timeBasedKeyBuilder("variants")),
                        event.getVariant()));
                  }
                }));

    activeVariants
        .apply("build variants hll key", hllKeyGenerator("variants"))
        .apply("Write Active experiments data",
            redisWriter.withMethod(Method.PFADD));

    activeVariants
        .apply("build variants set key", setKeyGenerator("variants"))
        .apply("Write Active experiments count", redisWriter.withMethod(Method.SADD));

    // Date_Hour_Minute based metrics
    PCollection<Pair<String, String>> datesHoursBasedEvents =
        parsedLoggedEvents
            .apply("Extract Users per minute", extractUsersForDateTime());

    datesHoursBasedEvents
        .apply("Build HLL Keys (Date_Time)", hllKeyGenerator("dthr"))
        .apply("Add User to DateTime HLL", redisWriter.withMethod(Method.PFADD));

    datesHoursBasedEvents
        .apply("Build Set Keys (Date_Time)", setKeyGenerator("dthr"))
        .apply("Add User to DateTime set", redisWriter.withMethod(Method.SADD));

    pipeline.run();
  }