in ingestion-core/src/main/java/com/mozilla/telemetry/ingestion/core/transform/PubsubMessageToObjectNode.java [667:724]
private Optional<JsonNode> coerceSingleValueToBqType(JsonNode o, Field field) {
if (field.getType() == LegacySQLTypeName.STRING) {
if (o.isTextual()) {
return Optional.of(o);
} else if (o.isObject() && o.has("histogram_type")) {
return Optional.of(compactHistogramEncoding(o, field.getName()));
} else {
// If not already a string, we JSON-ify the value.
// We have many fields that we expect to be coerced to string (histograms, userPrefs,
// etc.)
// so no point in maintaining a counter here as it will quickly reach many billions.
return Optional.of(TextNode.valueOf(Json.asString(o)));
}
// Our BigQuery schemas use Standard SQL type names, but the BQ API expects legacy SQL
// type names, so we end up with technically invalid types of INT64 that we need to
// check for.
} else if (field.getType() == LegacySQLTypeName.INTEGER
|| StandardSQLTypeName.INT64.name().equals(field.getType().name())) {
if (o.isInt() || o.isLong()) {
return Optional.of(o);
} else if (o.isBoolean()) {
incrementCoercedToInt();
// We assume that false is equivalent to zero and true to 1.
return Optional.of(IntNode.valueOf(o.asBoolean() ? 1 : 0));
} else {
incrementNotCoercedToInt();
return Optional.empty();
}
// Our BigQuery schemas use Standard SQL type names, but the BQ API expects legacy SQL
// type names, so we may end up with technically invalid types of FLOAT that we need to
// check for.
} else if (field.getType() == LegacySQLTypeName.FLOAT
|| StandardSQLTypeName.FLOAT64.name().equals(field.getType().name())) {
if (o.isNumber()) {
return Optional.of(o);
} else if (o.isBoolean()) {
incrementCoercedToFloat();
// We assume that false is equivalent to zero and true to 1.
return Optional.of(IntNode.valueOf(o.asBoolean() ? 1 : 0));
} else {
incrementNotCoercedToFloat();
return Optional.empty();
}
// Our BigQuery schemas use Standard SQL type names, but the BQ API expects legacy SQL
// type names, so we may end up with technically invalid types of BOOL that we need to
// check for.
} else if (field.getType() == LegacySQLTypeName.BOOLEAN
|| StandardSQLTypeName.BOOL.name().equals(field.getType().name())) {
if (o.isBoolean()) {
return Optional.of(o);
} else {
incrementNotCoercedToBool();
return Optional.empty();
}
} else {
return Optional.of(o);
}
}