public static PubsubMessage apply()

in ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/transform/StringToPubsubMessage.java [22:45]


  public static PubsubMessage apply(String line) {
    try {
      ObjectNode node = Json.readObjectNode(line);
      PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder();
      JsonNode dataNode = node.path(FieldName.PAYLOAD);
      if (!dataNode.isMissingNode() && !dataNode.isNull()) {
        final byte[] data;
        if (dataNode.isTextual()) {
          data = BASE64_DECODER.decode(dataNode.asText());
        } else {
          data = Json.asBytes(dataNode);
        }
        messageBuilder.setData(ByteString.copyFrom(data));
      }
      JsonNode attributeNode = node.path("attributeMap");
      if (attributeNode.isObject()) {
        attributeNode.fields().forEachRemaining(
            entry -> messageBuilder.putAttributes(entry.getKey(), entry.getValue().asText()));
      }
      return messageBuilder.build();
    } catch (IOException e) {
      throw new UncheckedIOException(e);
    }
  }