in flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java [72:132]
static SerializationConverter createExternalConverter(DataType type, ZoneId pipelineZoneId) {
switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR:
return (index, val) -> val.getString(index).toString();
case BOOLEAN:
return (index, val) -> val.getBoolean(index);
case BINARY:
case VARBINARY:
return (index, val) -> val.getBinary(index);
case DECIMAL:
final int decimalPrecision = ((DecimalType) type).getPrecision();
final int decimalScale = ((DecimalType) type).getScale();
return (index, val) ->
val.getDecimal(index, decimalPrecision, decimalScale).toBigDecimal();
case TINYINT:
return (index, val) -> val.getByte(index);
case SMALLINT:
return (index, val) -> val.getShort(index);
case INTEGER:
return (index, val) -> val.getInt(index);
case BIGINT:
return (index, val) -> val.getLong(index);
case FLOAT:
return (index, val) -> val.getFloat(index);
case DOUBLE:
return (index, val) -> val.getDouble(index);
case DATE:
return (index, val) ->
LocalDate.ofEpochDay(val.getInt(index))
.format(DorisEventSerializer.DATE_FORMATTER);
case TIMESTAMP_WITHOUT_TIME_ZONE:
return (index, val) ->
val.getTimestamp(index, DataTypeChecks.getPrecision(type))
.toLocalDateTime()
.format(DorisEventSerializer.DATE_TIME_FORMATTER);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return (index, val) ->
ZonedDateTime.ofInstant(
val.getLocalZonedTimestampData(
index, DataTypeChecks.getPrecision(type))
.toInstant(),
pipelineZoneId)
.toLocalDateTime()
.format(DorisEventSerializer.DATE_TIME_FORMATTER);
case TIMESTAMP_WITH_TIME_ZONE:
final int zonedP = ((ZonedTimestampType) type).getPrecision();
return (index, val) -> val.getTimestamp(index, zonedP).toTimestamp();
case TIME_WITHOUT_TIME_ZONE:
return (index, val) -> LocalTime.ofNanoOfDay(val.getLong(index) * 1_000_000L);
case ARRAY:
return (index, val) -> convertArrayData(val.getArray(index), type);
case MAP:
return (index, val) -> writeValueAsString(convertMapData(val.getMap(index), type));
case ROW:
return (index, val) ->
writeValueAsString(convertRowData(val, index, type, pipelineZoneId));
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}