in hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java [1086:1167]
private static Object rewritePrimaryTypeWithDiffSchemaType(Object oldValue, Schema oldSchema, Schema newSchema) {
switch (newSchema.getType()) {
case NULL:
case BOOLEAN:
break;
case INT:
if (newSchema.getLogicalType() == LogicalTypes.date() && oldSchema.getType() == Schema.Type.STRING) {
return fromJavaDate(java.sql.Date.valueOf(oldValue.toString()));
}
break;
case LONG:
if (oldSchema.getType() == Schema.Type.INT) {
return ((Integer) oldValue).longValue();
}
break;
case FLOAT:
if ((oldSchema.getType() == Schema.Type.INT)
|| (oldSchema.getType() == Schema.Type.LONG)) {
return oldSchema.getType() == Schema.Type.INT ? ((Integer) oldValue).floatValue() : ((Long) oldValue).floatValue();
}
break;
case DOUBLE:
if (oldSchema.getType() == Schema.Type.FLOAT) {
// java float cannot convert to double directly, deal with float precision change
return Double.valueOf(oldValue + "");
} else if (oldSchema.getType() == Schema.Type.INT) {
return ((Integer) oldValue).doubleValue();
} else if (oldSchema.getType() == Schema.Type.LONG) {
return ((Long) oldValue).doubleValue();
}
break;
case BYTES:
if (oldSchema.getType() == Schema.Type.STRING) {
return ByteBuffer.wrap(getUTF8Bytes(oldValue.toString()));
}
break;
case STRING:
if (oldSchema.getType() == Schema.Type.ENUM) {
return String.valueOf(oldValue);
}
if (oldSchema.getType() == Schema.Type.BYTES) {
return String.valueOf(oldValue);
}
if (oldSchema.getLogicalType() == LogicalTypes.date()) {
return toJavaDate((Integer) oldValue).toString();
}
if (oldSchema.getType() == Schema.Type.INT
|| oldSchema.getType() == Schema.Type.LONG
|| oldSchema.getType() == Schema.Type.FLOAT
|| oldSchema.getType() == Schema.Type.DOUBLE) {
return oldValue.toString();
}
if (oldSchema.getType() == Schema.Type.FIXED && oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
final byte[] bytes;
bytes = ((GenericFixed) oldValue).bytes();
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) oldSchema.getLogicalType();
BigDecimal bd = new BigDecimal(new BigInteger(bytes), decimal.getScale());
return bd.toString();
}
break;
case FIXED:
// deal with decimal Type
if (newSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
// TODO: support more types
if (oldSchema.getType() == Schema.Type.STRING
|| oldSchema.getType() == Schema.Type.DOUBLE
|| oldSchema.getType() == Schema.Type.INT
|| oldSchema.getType() == Schema.Type.LONG
|| oldSchema.getType() == Schema.Type.FLOAT) {
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) newSchema.getLogicalType();
// due to Java, there will be precision problems in direct conversion, we should use string instead of use double
BigDecimal bigDecimal = new java.math.BigDecimal(oldValue.toString()).setScale(decimal.getScale(), RoundingMode.HALF_UP);
return DECIMAL_CONVERSION.toFixed(bigDecimal, newSchema, newSchema.getLogicalType());
} else if (oldSchema.getType() == Schema.Type.BYTES) {
return convertBytesToFixed(((ByteBuffer) oldValue).array(), newSchema);
}
}
break;
default:
}
throw new HoodieAvroSchemaException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema));
}