in ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/transform/StringToPubsubMessage.java [22:45]
public static PubsubMessage apply(String line) {
try {
ObjectNode node = Json.readObjectNode(line);
PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder();
JsonNode dataNode = node.path(FieldName.PAYLOAD);
if (!dataNode.isMissingNode() && !dataNode.isNull()) {
final byte[] data;
if (dataNode.isTextual()) {
data = BASE64_DECODER.decode(dataNode.asText());
} else {
data = Json.asBytes(dataNode);
}
messageBuilder.setData(ByteString.copyFrom(data));
}
JsonNode attributeNode = node.path("attributeMap");
if (attributeNode.isObject()) {
attributeNode.fields().forEachRemaining(
entry -> messageBuilder.putAttributes(entry.getKey(), entry.getValue().asText()));
}
return messageBuilder.build();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}