in hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java [83:270]
public static RowDataToAvroConverter createConverter(LogicalType type, boolean utcTimezone) {
final RowDataToAvroConverter converter;
switch (type.getTypeRoot()) {
case NULL:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return null;
}
};
break;
case TINYINT:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ((Byte) object).intValue();
}
};
break;
case SMALLINT:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ((Short) object).intValue();
}
};
break;
case BOOLEAN: // boolean
case INTEGER: // int
case INTERVAL_YEAR_MONTH: // long
case BIGINT: // long
case INTERVAL_DAY_TIME: // long
case FLOAT: // float
case DOUBLE: // double
case TIME_WITHOUT_TIME_ZONE: // int
case DATE: // int
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return object;
}
};
break;
case CHAR:
case VARCHAR:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return new Utf8(object.toString());
}
};
break;
case BINARY:
case VARBINARY:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ByteBuffer.wrap((byte[]) object);
}
};
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
int precision = RowDataUtils.precision(type);
if (precision <= 3) {
converter = new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ((TimestampData) object).toInstant().toEpochMilli();
}
};
} else if (precision <= 6) {
converter = new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
Instant instant = ((TimestampData) object).toInstant();
return Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1000_000), instant.getNano() / 1000);
}
};
} else {
throw new UnsupportedOperationException("Unsupported timestamp precision: " + precision);
}
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
precision = RowDataUtils.precision(type);
if (precision <= 3) {
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return utcTimezone ? ((TimestampData) object).toInstant().toEpochMilli() : ((TimestampData) object).toTimestamp().getTime();
}
};
} else if (precision <= 6) {
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
Instant instant = utcTimezone ? ((TimestampData) object).toInstant() : ((TimestampData) object).toTimestamp().toInstant();
return Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1000_000), instant.getNano() / 1000);
}
};
} else {
throw new UnsupportedOperationException("Unsupported timestamp precision: " + precision);
}
break;
case DECIMAL:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
BigDecimal javaDecimal = ((DecimalData) object).toBigDecimal();
return DECIMAL_CONVERSION.toFixed(javaDecimal, schema, schema.getLogicalType());
}
};
break;
case ARRAY:
converter = createArrayConverter((ArrayType) type, utcTimezone);
break;
case ROW:
converter = createRowConverter((RowType) type, utcTimezone);
break;
case MAP:
case MULTISET:
converter = createMapConverter(type, utcTimezone);
break;
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
// wrap into nullable converter
return new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
if (object == null) {
return null;
}
// get actual schema if it is a nullable schema
Schema actualSchema;
if (schema.getType() == Schema.Type.UNION) {
List<Schema> types = schema.getTypes();
int size = types.size();
if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
actualSchema = types.get(0);
} else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
actualSchema = types.get(1);
} else {
throw new IllegalArgumentException(
"The Avro schema is not a nullable type: " + schema);
}
} else {
actualSchema = schema;
}
return converter.convert(actualSchema, object);
}
};
}