private static Object rewritePrimaryTypeWithDiffSchemaType()

in hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java [1086:1167]


  private static Object rewritePrimaryTypeWithDiffSchemaType(Object oldValue, Schema oldSchema, Schema newSchema) {
    switch (newSchema.getType()) {
      case NULL:
      case BOOLEAN:
        break;
      case INT:
        if (newSchema.getLogicalType() == LogicalTypes.date() && oldSchema.getType() == Schema.Type.STRING) {
          return fromJavaDate(java.sql.Date.valueOf(oldValue.toString()));
        }
        break;
      case LONG:
        if (oldSchema.getType() == Schema.Type.INT) {
          return ((Integer) oldValue).longValue();
        }
        break;
      case FLOAT:
        if ((oldSchema.getType() == Schema.Type.INT)
            || (oldSchema.getType() == Schema.Type.LONG)) {
          return oldSchema.getType() == Schema.Type.INT ? ((Integer) oldValue).floatValue() : ((Long) oldValue).floatValue();
        }
        break;
      case DOUBLE:
        if (oldSchema.getType() == Schema.Type.FLOAT) {
          // java float cannot convert to double directly, deal with float precision change
          return Double.valueOf(oldValue + "");
        } else if (oldSchema.getType() == Schema.Type.INT) {
          return ((Integer) oldValue).doubleValue();
        } else if (oldSchema.getType() == Schema.Type.LONG) {
          return ((Long) oldValue).doubleValue();
        }
        break;
      case BYTES:
        if (oldSchema.getType() == Schema.Type.STRING) {
          return ByteBuffer.wrap(getUTF8Bytes(oldValue.toString()));
        }
        break;
      case STRING:
        if (oldSchema.getType() == Schema.Type.ENUM) {
          return String.valueOf(oldValue);
        }
        if (oldSchema.getType() == Schema.Type.BYTES) {
          return String.valueOf(oldValue);
        }
        if (oldSchema.getLogicalType() == LogicalTypes.date()) {
          return toJavaDate((Integer) oldValue).toString();
        }
        if (oldSchema.getType() == Schema.Type.INT
            || oldSchema.getType() == Schema.Type.LONG
            || oldSchema.getType() == Schema.Type.FLOAT
            || oldSchema.getType() == Schema.Type.DOUBLE) {
          return oldValue.toString();
        }
        if (oldSchema.getType() == Schema.Type.FIXED && oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
          final byte[] bytes;
          bytes = ((GenericFixed) oldValue).bytes();
          LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) oldSchema.getLogicalType();
          BigDecimal bd = new BigDecimal(new BigInteger(bytes), decimal.getScale());
          return bd.toString();
        }
        break;
      case FIXED:
        // deal with decimal Type
        if (newSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
          // TODO: support more types
          if (oldSchema.getType() == Schema.Type.STRING
              || oldSchema.getType() == Schema.Type.DOUBLE
              || oldSchema.getType() == Schema.Type.INT
              || oldSchema.getType() == Schema.Type.LONG
              || oldSchema.getType() == Schema.Type.FLOAT) {
            LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) newSchema.getLogicalType();
            // due to Java, there will be precision problems in direct conversion, we should use string instead of use double
            BigDecimal bigDecimal = new java.math.BigDecimal(oldValue.toString()).setScale(decimal.getScale(), RoundingMode.HALF_UP);
            return DECIMAL_CONVERSION.toFixed(bigDecimal, newSchema, newSchema.getLogicalType());
          } else if (oldSchema.getType() == Schema.Type.BYTES) {
            return convertBytesToFixed(((ByteBuffer) oldValue).array(), newSchema);
          }
        }
        break;
      default:
    }
    throw new HoodieAvroSchemaException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema));
  }