in beam-collector/src/main/java/com/google/collector/persistence/ToBigTable.java [44:65]
public PDone expand(PCollection<Constellation> input) {
return input.apply(ParDo.of(new DoFn<Constellation, Mutation>() {
@ProcessElement
public void processElement(ProcessContext ctx) {
Constellation element = ctx.element();
long timestamp = System.currentTimeMillis();
Put row = new Put(Bytes.toBytes(element.getId()));
String json = new Gson().toJson(element);
row.addColumn(
Bytes.toBytes("processed_reports"),
Bytes.toBytes("report"),
timestamp,
Bytes.toBytes(json)
);
ctx.output(row);
}
}))
.apply(CloudBigtableIO.writeToTable(bigtableTableConfig));
}