public static PCollection fromBigTable()

in beam-collector/src/main/java/com/google/collector/loaders/LoadReports.java [48:84]


  public static PCollection<SecurityReport> fromBigTable(Pipeline p, CollectorOptions options) {
    // scans entire table
    return p.apply("LoadFromBigTable", BigtableIO.read()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId()))
        .apply("RowsToReports", ParDo.of(new DoFn<Row, SecurityReport>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            Optional<byte[]> bytes = c.element().getFamiliesList().stream()
                // find target family
                .filter(f -> "description".equals(f.getName()))
                .findFirst()
                // find target column
                .map(Family::getColumnsList)
                .flatMap(columns -> columns.stream()
                    .filter(col -> "data".equals(col.getQualifier().toStringUtf8())).findFirst()
                    .map(col -> col.getCells(0)))
                // extract report as byte array
                .map(Cell::getValue)
                .map(ByteString::toByteArray);

            if (!bytes.isPresent()) {
              // ignore malformed reports
              return;
            }

            try {
              Parser<SecurityReport> reportParser = SecurityReport.getDefaultInstance().getParserForType();
              SecurityReport securityReport = reportParser.parseFrom(bytes.get());
              c.output(securityReport);
            } catch (InvalidProtocolBufferException ignored) {
              // ignore malformed reports
            }
          }
        }));
  }