in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java [1569:1800]
public static Object jsonValueFromMessageValue(
SchemaInformation schemaInformation,
FieldDescriptor fieldDescriptor,
Object fieldValue,
boolean expandRepeated,
Predicate<String> includeField,
String prefix,
boolean useSetF) {
if (expandRepeated && fieldDescriptor.isRepeated()) {
List<Object> valueList = (List<Object>) fieldValue;
List<Object> expanded = Lists.newArrayListWithCapacity(valueList.size());
for (Object value : valueList) {
Object translatedValue =
jsonValueFromMessageValue(
schemaInformation, fieldDescriptor, value, false, includeField, prefix, useSetF);
if (!useSetF && translatedValue instanceof Optional) {
Optional<?> optional = (Optional<?>) translatedValue;
if (!optional.isPresent()) {
// A nested element contained an "f" column. Fail the call.
return Optional.empty();
}
translatedValue = optional.get();
}
expanded.add(translatedValue);
}
return expanded;
}
// BigQueryIO supports direct proto writes - i.e. we allow the user to pass in their own proto
// and skip our
// conversion layer, as long as the proto conforms to the types supported by the BigQuery
// Storage Write API.
// For many schema types, the Storage Write API supports different proto field types (often with
// different
// encodings), so the mapping of schema type -> proto type is one to many. To read the data out
// of the proto,
// we need to examine both the schema type and the proto field type.
switch (schemaInformation.getType()) {
case DOUBLE:
switch (fieldDescriptor.getType()) {
case FLOAT:
case DOUBLE:
case STRING:
return DECIMAL_FORMAT.format(Double.parseDouble(fieldValue.toString()));
case MESSAGE:
// Handle the various number wrapper types.
Message doubleMessage = (Message) fieldValue;
if (FLOAT_VALUE_DESCRIPTOR_NAMES.contains(fieldDescriptor.getMessageType().getName())) {
float floatValue =
(float)
doubleMessage.getField(
doubleMessage.getDescriptorForType().findFieldByName("value"));
return DECIMAL_FORMAT.format(floatValue);
} else if (DOUBLE_VALUE_DESCRIPTOR_NAMES.contains(
fieldDescriptor.getMessageType().getName())) {
double doubleValue =
(double)
doubleMessage.getField(
doubleMessage.getDescriptorForType().findFieldByName("value"));
return DECIMAL_FORMAT.format(doubleValue);
} else {
throw new RuntimeException(
"Not implemented yet " + fieldDescriptor.getMessageType().getName());
}
default:
return fieldValue.toString();
}
case BOOL:
// Wrapper type.
if (fieldDescriptor.getType().equals(FieldDescriptor.Type.MESSAGE)) {
Message boolMessage = (Message) fieldValue;
if (BOOL_VALUE_DESCRIPTOR_NAMES.contains(fieldDescriptor.getMessageType().getName())) {
return boolMessage
.getField(boolMessage.getDescriptorForType().findFieldByName("value"))
.toString();
} else {
throw new RuntimeException(
"Not implemented yet " + fieldDescriptor.getMessageType().getName());
}
}
return fieldValue.toString();
case JSON:
case GEOGRAPHY:
// The above types have native representations in JSON for all their
// possible values.
case STRING:
return fieldValue.toString();
case INT64:
switch (fieldDescriptor.getType()) {
case MESSAGE:
// Wrapper types.
Message message = (Message) fieldValue;
if (INT32_VALUE_DESCRIPTOR_NAMES.contains(fieldDescriptor.getMessageType().getName())) {
return message
.getField(message.getDescriptorForType().findFieldByName("value"))
.toString();
} else if (INT64_VALUE_DESCRIPTOR_NAMES.contains(
fieldDescriptor.getMessageType().getName())) {
return message
.getField(message.getDescriptorForType().findFieldByName("value"))
.toString();
} else if (UINT32_VALUE_DESCRIPTOR_NAMES.contains(
fieldDescriptor.getMessageType().getName())) {
return message
.getField(message.getDescriptorForType().findFieldByName("value"))
.toString();
} else if (UINT64_VALUE_DESCRIPTOR_NAMES.contains(
fieldDescriptor.getMessageType().getName())) {
return message
.getField(message.getDescriptorForType().findFieldByName("value"))
.toString();
} else {
throw new RuntimeException(
"Not implemented yet " + fieldDescriptor.getMessageType().getFullName());
}
default:
return fieldValue.toString();
}
case BYTES:
switch (fieldDescriptor.getType()) {
case BYTES:
return BaseEncoding.base64().encode(((ByteString) fieldValue).toByteArray());
case STRING:
return BaseEncoding.base64()
.encode(((String) fieldValue).getBytes(StandardCharsets.UTF_8));
case MESSAGE:
Message message = (Message) fieldValue;
if (BYTES_VALUE_DESCRIPTOR_NAMES.contains(fieldDescriptor.getMessageType().getName())) {
ByteString byteString =
(ByteString)
message.getField(message.getDescriptorForType().findFieldByName("value"));
return BaseEncoding.base64().encode(byteString.toByteArray());
}
throw new RuntimeException(
"Not implemented " + fieldDescriptor.getMessageType().getFullName());
default:
return fieldValue.toString();
}
case TIMESTAMP:
if (isProtoFieldTypeInteger(fieldDescriptor.getType())) {
long epochMicros = Long.valueOf(fieldValue.toString());
long epochSeconds = epochMicros / 1_000_000L;
long nanoAdjustment = (epochMicros % 1_000_000L) * 1_000L;
Instant instant = Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
return LocalDateTime.ofInstant(instant, ZoneOffset.UTC).format(TIMESTAMP_FORMATTER);
} else if (fieldDescriptor.getType().equals(FieldDescriptor.Type.MESSAGE)) {
Message message = (Message) fieldValue;
String messageName = fieldDescriptor.getMessageType().getName();
if (TIMESTAMP_VALUE_DESCRIPTOR_NAMES.contains(
fieldDescriptor.getMessageType().getName())) {
Descriptor descriptor = message.getDescriptorForType();
long seconds = (long) message.getField(descriptor.findFieldByName("seconds"));
int nanos = (int) message.getField(descriptor.findFieldByName("nanos"));
Instant instant = Instant.ofEpochSecond(seconds, nanos);
return LocalDateTime.ofInstant(instant, ZoneOffset.UTC).format(TIMESTAMP_FORMATTER);
} else if (messageName.equals("TimestampPicos")) {
Descriptor descriptor = message.getDescriptorForType();
long seconds = (long) message.getField(descriptor.findFieldByName("seconds"));
long picoseconds = (long) message.getField(descriptor.findFieldByName("picoseconds"));
// Convert to ISO timestamp string with picoseconds
Instant instant = Instant.ofEpochSecond(seconds);
String baseTimestamp = instant.toString(); // "2024-01-15T10:30:45Z"
// Format picoseconds as 12-digit string
String picosPart = String.format("%012d", picoseconds);
// Insert before 'Z': "2024-01-15T10:30:45Z" → "2024-01-15T10:30:45.123456789012Z"
return baseTimestamp.replace("Z", "." + picosPart + "Z");
} else {
throw new RuntimeException(
"Not implemented yet " + fieldDescriptor.getMessageType().getFullName());
}
} else {
return fieldValue.toString();
}
case DATE:
if (isProtoFieldTypeInteger(fieldDescriptor.getType())) {
int intDate = Integer.parseInt(fieldValue.toString());
return LocalDate.ofEpochDay(intDate).toString();
} else {
return fieldValue.toString();
}
case NUMERIC:
switch (fieldDescriptor.getType()) {
case BYTES:
ByteString numericByteString = (ByteString) fieldValue;
return BigDecimalByteStringEncoder.decodeNumericByteString(numericByteString)
.stripTrailingZeros()
.toString();
default:
return fieldValue.toString();
}
case BIGNUMERIC:
switch (fieldDescriptor.getType()) {
case BYTES:
ByteString numericByteString = (ByteString) fieldValue;
return BigDecimalByteStringEncoder.decodeBigNumericByteString(numericByteString)
.stripTrailingZeros()
.toString();
default:
return fieldValue.toString();
}
case DATETIME:
if (isProtoFieldTypeInteger(fieldDescriptor.getType())) {
long packedDateTime = Long.valueOf(fieldValue.toString());
return CivilTimeEncoder.decodePacked64DatetimeMicrosAsJavaTime(packedDateTime)
.format(BigQueryUtils.BIGQUERY_DATETIME_FORMATTER);
} else {
return fieldValue.toString();
}
case TIME:
if (isProtoFieldTypeInteger(fieldDescriptor.getType())) {
long packedTime = Long.valueOf(fieldValue.toString());
return CivilTimeEncoder.decodePacked64TimeMicrosAsJavaTime(packedTime).toString();
} else {
return fieldValue.toString();
}
case STRUCT:
return useSetF
? tableRowFromMessageUseSetF(
schemaInformation, (Message) fieldValue, false, includeField, prefix)
: tableRowFromMessageNoF(
schemaInformation, (Message) fieldValue, false, includeField, prefix);
default:
return fieldValue.toString();
}
}