public PCollection expand()

in ingestion-beam/src/main/java/com/mozilla/telemetry/io/Read.java [122:200]


    public PCollection<PubsubMessage> expand(PBegin input) {
      BigQueryIO.TypedRead<PubsubMessage> read = BigQueryIO
          .read((SchemaAndRecord schemaAndRecord) -> {
            TableSchema tableSchema = schemaAndRecord.getTableSchema();
            GenericRecord record = schemaAndRecord.getRecord();

            // We have to take care not to read additional bytes; see
            // https://github.com/mozilla/gcp-ingestion/issues/1266
            ByteBuffer byteBuffer = (ByteBuffer) record.get(FieldName.PAYLOAD);
            final byte[] payload;
            if (byteBuffer != null) {
              payload = new byte[byteBuffer.limit()];
              byteBuffer.get(payload);
            } else {
              payload = new byte[0];
            }

            // We populate attributes for all simple string and timestamp fields, which is complete
            // for raw and error tables.
            // Decoded payload tables also have a top-level nested "metadata" struct; we can mostly
            // just drop this since the same metadata object is encoded in the payload, but we do
            // parse out the document namespace, type, and version since those are necessary in the
            // case of a Sink job that doesn't look at the payload but still may need those
            // attributes in order to route to the correct destination.
            Map<String, String> attributes = new HashMap<>();
            tableSchema.getFields().stream() //
                .filter(f -> !"REPEATED".equals(f.getMode())) //
                .forEach(f -> {
                  Object value = record.get(f.getName());
                  if (value != null) {
                    switch (f.getType()) {
                      case "TIMESTAMP":
                        attributes.put(f.getName(), Time.epochMicrosToTimestamp((Long) value));
                        break;
                      case "STRING":
                      case "INTEGER":
                      case "INT64":
                        attributes.put(f.getName(), value.toString());
                        break;
                      case "RECORD":
                      case "STRUCT":
                        // The only struct we support is the top-level nested "metadata" and we
                        // extract only the attributes needed for destination routing.
                        GenericRecord metadata = (GenericRecord) value;
                        Arrays
                            .asList(Attribute.DOCUMENT_NAMESPACE, Attribute.DOCUMENT_TYPE,
                                Attribute.DOCUMENT_VERSION)
                            .forEach(v -> attributes.put(v, metadata.get(v).toString()));
                        break;
                      // Ignore any other types (only the payload BYTES field should hit this).
                      default:
                        break;
                    }
                  }
                });
            return new PubsubMessage(payload, attributes);
          }) //
          .withCoder(PubsubMessageWithAttributesCoder.of()) //
          .withTemplateCompatibility() //
          .withoutValidation() //
          .withMethod(method.method);
      switch (source) {
        case TABLE:
          read = read.from(tableSpec);
          break;
        default:
        case QUERY:
          read = read.fromQuery(tableSpec).usingStandardSql();
      }
      if (source == Source.TABLE && method == BigQueryReadMethod.storageapi) {
        if (rowRestriction != null) {
          read = read.withRowRestriction(rowRestriction);
        }
        if (selectedFields != null) {
          read = read.withSelectedFields(selectedFields);
        }
      }
      return input.apply(read);
    }