private static Object convertObject()

in hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java [377:510]


    private static Object convertObject(Schema schema, Object value) {
      if (value == null) {
        return null;
      }
      // if we've reached max recursion depth in the provided schema, write out message to bytes
      if (RECURSION_OVERFLOW_SCHEMA.getFullName().equals(schema.getFullName())) {
        GenericData.Record overflowRecord = new GenericData.Record(schema);
        Message messageValue = (Message) value;
        overflowRecord.put(OVERFLOW_DESCRIPTOR_FIELD_NAME, messageValue.getDescriptorForType().getFullName());
        overflowRecord.put(OVERFLOW_BYTES_FIELD_NAME, ByteBuffer.wrap(messageValue.toByteArray()));
        return overflowRecord;
      }

      switch (schema.getType()) {
        case ARRAY:
          List<Object> arrayValue = (List<Object>) value;
          List<Object> arrayCopy = new GenericData.Array<>(arrayValue.size(), schema);
          for (Object obj : arrayValue) {
            arrayCopy.add(convertObject(schema.getElementType(), obj));
          }
          return arrayCopy;
        case BYTES:
          ByteBuffer byteBufferValue;
          if (value instanceof ByteString) {
            byteBufferValue = ((ByteString) value).asReadOnlyByteBuffer();
          } else if (value instanceof Message) {
            byteBufferValue = ((ByteString) getWrappedValue(value)).asReadOnlyByteBuffer();
          } else if (value instanceof byte[]) {
            byteBufferValue = ByteBuffer.wrap((byte[]) value);
          } else {
            byteBufferValue = (ByteBuffer) value;
          }
          int start = byteBufferValue.position();
          int length = byteBufferValue.limit() - start;
          byte[] bytesCopy = new byte[length];
          byteBufferValue.get(bytesCopy, 0, length);
          byteBufferValue.position(start);
          return ByteBuffer.wrap(bytesCopy, 0, length);
        case ENUM:
          return GenericData.get().createEnum(value.toString(), schema);
        case FIXED:
          if (value instanceof byte[]) {
            return GenericData.get().createFixed(null, (byte[]) value, schema);
          }
          Object unsignedLongValue = value;
          if (unsignedLongValue instanceof UInt64Value) {
            // Unwrap UInt64Value
            unsignedLongValue = getWrappedValue(unsignedLongValue);
          } else if (unsignedLongValue instanceof Message) {
            // Unexpected message type
            throw new HoodieException("Unexpected Message type when converting as an unsigned long: " + unsignedLongValue.getClass().getName());
          }
          // convert the long to its unsigned value
          return DECIMAL_CONVERSION.toFixed(new BigDecimal(toUnsignedBigInteger((Long) unsignedLongValue)), schema, schema.getLogicalType());
        case BOOLEAN:
        case DOUBLE:
        case FLOAT:
        case INT:
          if (value instanceof Message) {
            return getWrappedValue(value);
          }
          return value; // immutable
        case LONG:
          Object tmpValue = value;
          if (value instanceof Message) {
            // check if this is a Timestamp
            if (LogicalTypes.timestampMicros().equals(schema.getLogicalType())) {
              if (value instanceof Timestamp) {
                return Timestamps.toMicros((Timestamp) value);
              } else if (value instanceof DynamicMessage) {
                Timestamp.Builder builder = Timestamp.newBuilder();
                ((DynamicMessage) value).getAllFields().forEach((fieldDescriptor, fieldValue) -> {
                  if (fieldDescriptor.getFullName().equals("google.protobuf.Timestamp.seconds")) {
                    builder.setSeconds((Long) fieldValue);
                  } else if (fieldDescriptor.getFullName().equals("google.protobuf.Timestamp.nanos")) {
                    builder.setNanos((Integer) fieldValue);
                  }
                });
                return Timestamps.toMicros(builder.build());
              } else {
                throw new HoodieSchemaException("Unexpected message type while handling timestamps: " + value.getClass().getName());
              }
            } else {
              tmpValue = getWrappedValue(value);
            }
          }
          // unsigned ints need to be casted to long
          if (tmpValue instanceof Integer) {
            tmpValue = new Long((Integer) tmpValue);
          }
          return tmpValue;
        case MAP:
          Map<Object, Object> mapValue = (Map) value;
          Map<Object, Object> mapCopy = new HashMap<>(mapValue.size());
          for (Map.Entry<Object, Object> entry : mapValue.entrySet()) {
            mapCopy.put(convertObject(STRING_SCHEMA, entry.getKey()), convertObject(schema.getValueType(), entry.getValue()));
          }
          return mapCopy;
        case NULL:
          return null;
        case RECORD:
          GenericData.Record newRecord = new GenericData.Record(schema);
          Message messageValue = (Message) value;
          Descriptors.FieldDescriptor[] orderedFields = getOrderedFields(schema, messageValue);
          for (Schema.Field field : schema.getFields()) {
            int position = field.pos();
            Descriptors.FieldDescriptor fieldDescriptor = orderedFields[position];
            Object convertedValue;
            Schema fieldSchema = field.schema();
            // if incoming message does not contain the field, fieldDescriptor will be null
            // if the field schema is a union, it is nullable
            if (fieldSchema.getType() == Schema.Type.UNION && (fieldDescriptor == null || (!fieldDescriptor.isRepeated() && !messageValue.hasField(fieldDescriptor)))) {
              convertedValue = null;
            } else {
              convertedValue = convertObject(fieldSchema, fieldDescriptor == null ? field.defaultVal() : messageValue.getField(fieldDescriptor));
            }
            newRecord.put(position, convertedValue);
          }
          return newRecord;
        case STRING:
          if (value instanceof String) {
            return value;
          } else if (value instanceof StringValue) {
            return ((StringValue) value).getValue();
          } else {
            return new Utf8(value.toString());
          }
        case UNION:
          // Unions only occur for nullable fields when working with proto + avro and null is the first schema in the union
          return convertObject(schema.getTypes().get(1), value);
        default:
          throw new HoodieException("Proto to Avro conversion failed for schema \"" + schema + "\" and value \"" + value + "\"");
      }
    }