in seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/RowDataToAvroConverters.java [68:220]
public static RowDataToAvroConverter createConverter(SeaTunnelDataType<?> dataType) {
final RowDataToAvroConverter converter;
switch (dataType.getSqlType()) {
case TINYINT:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ((Byte) object).intValue();
}
};
break;
case SMALLINT:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ((Short) object).intValue();
}
};
break;
case BOOLEAN: // boolean
case INT: // int
case BIGINT: // long
case FLOAT: // float
case DOUBLE: // double
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return object;
}
};
break;
case TIME: // int
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ((LocalTime) object).get(ChronoField.MILLI_OF_DAY);
}
};
break;
case DATE: // int
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ((int) ((LocalDate) object).toEpochDay());
}
};
break;
case STRING:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return new Utf8(object.toString());
}
};
break;
case BYTES:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ByteBuffer.wrap((byte[]) object);
}
};
break;
case TIMESTAMP:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ((LocalDateTime) object)
.toInstant(java.time.ZoneOffset.UTC)
.toEpochMilli();
}
};
break;
case DECIMAL:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
BigDecimal javaDecimal = (BigDecimal) object;
return DECIMAL_CONVERSION.toFixed(
javaDecimal, schema, schema.getLogicalType());
}
};
break;
case ARRAY:
converter = createArrayConverter((ArrayType<?, ?>) dataType);
break;
case ROW:
converter = createRowConverter((SeaTunnelRowType) dataType);
break;
case MAP:
converter = createMapConverter(dataType);
break;
default:
throw new UnsupportedOperationException("Unsupported type: " + dataType);
}
// wrap into nullable converter
return new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
if (object == null) {
return null;
}
// get actual schema if it is a nullable schema
Schema actualSchema;
if (schema.getType() == Schema.Type.UNION) {
List<Schema> types = schema.getTypes();
int size = types.size();
if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
actualSchema = types.get(0);
} else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
actualSchema = types.get(1);
} else {
throw new IllegalArgumentException(
"The Avro schema is not a nullable type: " + schema);
}
} else {
actualSchema = schema;
}
return converter.convert(actualSchema, object);
}
};
}