public static Object jsonValueFromMessageValue()

in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java [1569:1800]


  public static Object jsonValueFromMessageValue(
      SchemaInformation schemaInformation,
      FieldDescriptor fieldDescriptor,
      Object fieldValue,
      boolean expandRepeated,
      Predicate<String> includeField,
      String prefix,
      boolean useSetF) {
    if (expandRepeated && fieldDescriptor.isRepeated()) {
      List<Object> valueList = (List<Object>) fieldValue;
      List<Object> expanded = Lists.newArrayListWithCapacity(valueList.size());
      for (Object value : valueList) {
        Object translatedValue =
            jsonValueFromMessageValue(
                schemaInformation, fieldDescriptor, value, false, includeField, prefix, useSetF);
        if (!useSetF && translatedValue instanceof Optional) {
          Optional<?> optional = (Optional<?>) translatedValue;
          if (!optional.isPresent()) {
            // A nested element contained an "f" column. Fail the call.
            return Optional.empty();
          }
          translatedValue = optional.get();
        }
        expanded.add(translatedValue);
      }
      return expanded;
    }

    // BigQueryIO supports direct proto writes - i.e. we allow the user to pass in their own proto
    // and skip our
    // conversion layer, as long as the proto conforms to the types supported by the BigQuery
    // Storage Write API.
    // For many schema types, the Storage Write API supports different proto field types (often with
    // different
    // encodings), so the mapping of schema type -> proto type is one to many. To read the data out
    // of the proto,
    // we need to examine both the schema type and the proto field type.
    switch (schemaInformation.getType()) {
      case DOUBLE:
        switch (fieldDescriptor.getType()) {
          case FLOAT:
          case DOUBLE:
          case STRING:
            return DECIMAL_FORMAT.format(Double.parseDouble(fieldValue.toString()));
          case MESSAGE:
            // Handle the various number wrapper types.
            Message doubleMessage = (Message) fieldValue;
            if (FLOAT_VALUE_DESCRIPTOR_NAMES.contains(fieldDescriptor.getMessageType().getName())) {
              float floatValue =
                  (float)
                      doubleMessage.getField(
                          doubleMessage.getDescriptorForType().findFieldByName("value"));

              return DECIMAL_FORMAT.format(floatValue);
            } else if (DOUBLE_VALUE_DESCRIPTOR_NAMES.contains(
                fieldDescriptor.getMessageType().getName())) {
              double doubleValue =
                  (double)
                      doubleMessage.getField(
                          doubleMessage.getDescriptorForType().findFieldByName("value"));
              return DECIMAL_FORMAT.format(doubleValue);
            } else {
              throw new RuntimeException(
                  "Not implemented yet " + fieldDescriptor.getMessageType().getName());
            }
          default:
            return fieldValue.toString();
        }
      case BOOL:
        // Wrapper type.
        if (fieldDescriptor.getType().equals(FieldDescriptor.Type.MESSAGE)) {
          Message boolMessage = (Message) fieldValue;
          if (BOOL_VALUE_DESCRIPTOR_NAMES.contains(fieldDescriptor.getMessageType().getName())) {
            return boolMessage
                .getField(boolMessage.getDescriptorForType().findFieldByName("value"))
                .toString();
          } else {
            throw new RuntimeException(
                "Not implemented yet " + fieldDescriptor.getMessageType().getName());
          }
        }
        return fieldValue.toString();
      case JSON:
      case GEOGRAPHY:
        // The above types have native representations in JSON for all their
        // possible values.
      case STRING:
        return fieldValue.toString();
      case INT64:
        switch (fieldDescriptor.getType()) {
          case MESSAGE:
            // Wrapper types.
            Message message = (Message) fieldValue;
            if (INT32_VALUE_DESCRIPTOR_NAMES.contains(fieldDescriptor.getMessageType().getName())) {
              return message
                  .getField(message.getDescriptorForType().findFieldByName("value"))
                  .toString();
            } else if (INT64_VALUE_DESCRIPTOR_NAMES.contains(
                fieldDescriptor.getMessageType().getName())) {
              return message
                  .getField(message.getDescriptorForType().findFieldByName("value"))
                  .toString();
            } else if (UINT32_VALUE_DESCRIPTOR_NAMES.contains(
                fieldDescriptor.getMessageType().getName())) {
              return message
                  .getField(message.getDescriptorForType().findFieldByName("value"))
                  .toString();
            } else if (UINT64_VALUE_DESCRIPTOR_NAMES.contains(
                fieldDescriptor.getMessageType().getName())) {
              return message
                  .getField(message.getDescriptorForType().findFieldByName("value"))
                  .toString();
            } else {
              throw new RuntimeException(
                  "Not implemented yet " + fieldDescriptor.getMessageType().getFullName());
            }
          default:
            return fieldValue.toString();
        }
      case BYTES:
        switch (fieldDescriptor.getType()) {
          case BYTES:
            return BaseEncoding.base64().encode(((ByteString) fieldValue).toByteArray());
          case STRING:
            return BaseEncoding.base64()
                .encode(((String) fieldValue).getBytes(StandardCharsets.UTF_8));
          case MESSAGE:
            Message message = (Message) fieldValue;
            if (BYTES_VALUE_DESCRIPTOR_NAMES.contains(fieldDescriptor.getMessageType().getName())) {
              ByteString byteString =
                  (ByteString)
                      message.getField(message.getDescriptorForType().findFieldByName("value"));
              return BaseEncoding.base64().encode(byteString.toByteArray());
            }
            throw new RuntimeException(
                "Not implemented " + fieldDescriptor.getMessageType().getFullName());
          default:
            return fieldValue.toString();
        }
      case TIMESTAMP:
        if (isProtoFieldTypeInteger(fieldDescriptor.getType())) {
          long epochMicros = Long.valueOf(fieldValue.toString());
          long epochSeconds = epochMicros / 1_000_000L;
          long nanoAdjustment = (epochMicros % 1_000_000L) * 1_000L;
          Instant instant = Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
          return LocalDateTime.ofInstant(instant, ZoneOffset.UTC).format(TIMESTAMP_FORMATTER);
        } else if (fieldDescriptor.getType().equals(FieldDescriptor.Type.MESSAGE)) {
          Message message = (Message) fieldValue;
          String messageName = fieldDescriptor.getMessageType().getName();
          if (TIMESTAMP_VALUE_DESCRIPTOR_NAMES.contains(
              fieldDescriptor.getMessageType().getName())) {
            Descriptor descriptor = message.getDescriptorForType();
            long seconds = (long) message.getField(descriptor.findFieldByName("seconds"));
            int nanos = (int) message.getField(descriptor.findFieldByName("nanos"));
            Instant instant = Instant.ofEpochSecond(seconds, nanos);
            return LocalDateTime.ofInstant(instant, ZoneOffset.UTC).format(TIMESTAMP_FORMATTER);
          } else if (messageName.equals("TimestampPicos")) {
            Descriptor descriptor = message.getDescriptorForType();
            long seconds = (long) message.getField(descriptor.findFieldByName("seconds"));
            long picoseconds = (long) message.getField(descriptor.findFieldByName("picoseconds"));

            // Convert to ISO timestamp string with picoseconds
            Instant instant = Instant.ofEpochSecond(seconds);
            String baseTimestamp = instant.toString(); // "2024-01-15T10:30:45Z"

            // Format picoseconds as 12-digit string
            String picosPart = String.format("%012d", picoseconds);

            // Insert before 'Z': "2024-01-15T10:30:45Z" → "2024-01-15T10:30:45.123456789012Z"
            return baseTimestamp.replace("Z", "." + picosPart + "Z");
          } else {
            throw new RuntimeException(
                "Not implemented yet " + fieldDescriptor.getMessageType().getFullName());
          }
        } else {
          return fieldValue.toString();
        }

      case DATE:
        if (isProtoFieldTypeInteger(fieldDescriptor.getType())) {
          int intDate = Integer.parseInt(fieldValue.toString());
          return LocalDate.ofEpochDay(intDate).toString();
        } else {
          return fieldValue.toString();
        }
      case NUMERIC:
        switch (fieldDescriptor.getType()) {
          case BYTES:
            ByteString numericByteString = (ByteString) fieldValue;
            return BigDecimalByteStringEncoder.decodeNumericByteString(numericByteString)
                .stripTrailingZeros()
                .toString();
          default:
            return fieldValue.toString();
        }
      case BIGNUMERIC:
        switch (fieldDescriptor.getType()) {
          case BYTES:
            ByteString numericByteString = (ByteString) fieldValue;
            return BigDecimalByteStringEncoder.decodeBigNumericByteString(numericByteString)
                .stripTrailingZeros()
                .toString();
          default:
            return fieldValue.toString();
        }

      case DATETIME:
        if (isProtoFieldTypeInteger(fieldDescriptor.getType())) {
          long packedDateTime = Long.valueOf(fieldValue.toString());
          return CivilTimeEncoder.decodePacked64DatetimeMicrosAsJavaTime(packedDateTime)
              .format(BigQueryUtils.BIGQUERY_DATETIME_FORMATTER);
        } else {
          return fieldValue.toString();
        }

      case TIME:
        if (isProtoFieldTypeInteger(fieldDescriptor.getType())) {
          long packedTime = Long.valueOf(fieldValue.toString());
          return CivilTimeEncoder.decodePacked64TimeMicrosAsJavaTime(packedTime).toString();
        } else {
          return fieldValue.toString();
        }
      case STRUCT:
        return useSetF
            ? tableRowFromMessageUseSetF(
                schemaInformation, (Message) fieldValue, false, includeField, prefix)
            : tableRowFromMessageNoF(
                schemaInformation, (Message) fieldValue, false, includeField, prefix);
      default:
        return fieldValue.toString();
    }
  }