in processor/src/main/java/com/google/cloud/solutions/realtimedash/pipeline/MetricsCalculationPipeline.java [43:196]
public static void main(String[] args) {
MetricsPipelineOptions options = extractPipelineOptions(args);
Pipeline pipeline = Pipeline.create(options);
PCollection<LogEvent> parsedLoggedEvents =
pipeline
.apply("Read PubSub Events",
PubsubIO.readStrings().fromTopic(options.getInputTopic()))
.apply("Parse Message JSON",
ParDo.of(new ParseMessageAsLogElement()));
RedisIO.Write redisWriter =
RedisIO.write()
.withEndpoint(
options.getRedisHost(), options.getRedisPort());
//visit counter
parsedLoggedEvents
.apply("Count visits per minute", ParDo.of(
new DoFn<LogEvent, KV<String, String>>() {
@ProcessElement
public void countSession(ProcessContext context) {
LogEvent event = context.element();
context.output(
KV.of(event.getTimestamp().toString(timeBasedKeyBuilder("visitCounter")), "1"));
}
}
))
.apply("Update Visit counter", redisWriter.withMethod(Method.INCRBY));
// Build user - experiment/variant metric
parsedLoggedEvents
.apply("extract user for experiment-variant per minute metric",
ParDo.of(new DoFn<LogEvent, KV<String, String>>() {
@ProcessElement
public void extractExperimentVariantPerTime(ProcessContext context) {
LogEvent event = context.element();
String key = event.getTimestamp().toString(timeBasedKeyBuilder(
"evcounter_e_" + event.getExperimentId() + "_v_" + event.getVariant()));
context.output(KV.of(key, event.getUid()));
}
}))
.apply("Update EV Counters", redisWriter.withMethod(Method.PFADD));
// Variant based metrics
PCollection<Pair<String, String>> variants =
parsedLoggedEvents
.apply("Extract Users per Variant",
ParDo.of(
new DoFn<LogEvent, Pair<String, String>>() {
@ProcessElement
public void extractVariantAndUser(ProcessContext context) {
LogEvent elem = context.element();
context.output(Pair.of("" + elem.getVariant(), elem.getUid()));
}
}));
variants
.apply("Build HLL Keys (Variant)", hllKeyGenerator("var"))
.apply("Add user to Variant HLL", redisWriter.withMethod(Method.PFADD));
variants
.apply("Build Set Keys (Variant)", setKeyGenerator("var"))
.apply("Add user to Variant set", redisWriter.withMethod(Method.SADD));
// Experiment based metrics
PCollection<Pair<String, String>> experiments =
parsedLoggedEvents
.apply("Extract Users per Experiment",
ParDo.of(
new DoFn<LogEvent, Pair<String, String>>() {
@ProcessElement
public void extractExperimentKvPair(ProcessContext context) {
LogEvent elem = context.element();
context.output(Pair.of(elem.getExperimentId(), elem.getUid()));
}
}
));
experiments
.apply("Build HLL Keys (Experiment)", hllKeyGenerator("exp"))
.apply("Add User to Experiment HLL", redisWriter.withMethod(Method.PFADD));
experiments
.apply("Build Set Keys (Experiment)", setKeyGenerator("exp"))
.apply("Add user to Experiment set", redisWriter.withMethod(Method.SADD));
PCollection<Pair<String, String>> activeExperiments =
parsedLoggedEvents
.apply("Build Time bound experiments",
ParDo.of(new DoFn<LogEvent, Pair<String, String>>() {
@ProcessElement
public void extractExperimentForTime(ProcessContext context) {
LogEvent event = context.element();
context.output(Pair.of(
event.getTimestamp().toString(
timeBasedKeyBuilder("experiments")),
event.getExperimentId()));
}
}));
activeExperiments
.apply("build experiments hll key", hllKeyGenerator("experiments"))
.apply("Write Active experiments data",
redisWriter.withMethod(Method.PFADD));
activeExperiments
.apply("build experiments set key", setKeyGenerator("experiments"))
.apply("Write Active experiments count", redisWriter.withMethod(Method.SADD));
PCollection<Pair<String, String>> activeVariants =
parsedLoggedEvents
.apply("Build Time bound Variants",
ParDo.of(new DoFn<LogEvent, Pair<String, String>>() {
@ProcessElement
public void extractExperimentForTime(ProcessContext context) {
LogEvent event = context.element();
context.output(Pair.of(
event.getTimestamp().toString(
timeBasedKeyBuilder("variants")),
event.getVariant()));
}
}));
activeVariants
.apply("build variants hll key", hllKeyGenerator("variants"))
.apply("Write Active experiments data",
redisWriter.withMethod(Method.PFADD));
activeVariants
.apply("build variants set key", setKeyGenerator("variants"))
.apply("Write Active experiments count", redisWriter.withMethod(Method.SADD));
// Date_Hour_Minute based metrics
PCollection<Pair<String, String>> datesHoursBasedEvents =
parsedLoggedEvents
.apply("Extract Users per minute", extractUsersForDateTime());
datesHoursBasedEvents
.apply("Build HLL Keys (Date_Time)", hllKeyGenerator("dthr"))
.apply("Add User to DateTime HLL", redisWriter.withMethod(Method.PFADD));
datesHoursBasedEvents
.apply("Build Set Keys (Date_Time)", setKeyGenerator("dthr"))
.apply("Add User to DateTime set", redisWriter.withMethod(Method.SADD));
pipeline.run();
}