in beam-collector/src/main/java/com/google/collector/loaders/LoadReports.java [48:84]
public static PCollection<SecurityReport> fromBigTable(Pipeline p, CollectorOptions options) {
// scans entire table
return p.apply("LoadFromBigTable", BigtableIO.read()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId()))
.apply("RowsToReports", ParDo.of(new DoFn<Row, SecurityReport>() {
@ProcessElement
public void processElement(ProcessContext c) {
Optional<byte[]> bytes = c.element().getFamiliesList().stream()
// find target family
.filter(f -> "description".equals(f.getName()))
.findFirst()
// find target column
.map(Family::getColumnsList)
.flatMap(columns -> columns.stream()
.filter(col -> "data".equals(col.getQualifier().toStringUtf8())).findFirst()
.map(col -> col.getCells(0)))
// extract report as byte array
.map(Cell::getValue)
.map(ByteString::toByteArray);
if (!bytes.isPresent()) {
// ignore malformed reports
return;
}
try {
Parser<SecurityReport> reportParser = SecurityReport.getDefaultInstance().getParserForType();
SecurityReport securityReport = reportParser.parseFrom(bytes.get());
c.output(securityReport);
} catch (InvalidProtocolBufferException ignored) {
// ignore malformed reports
}
}
}));
}