in flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/sink/OceanBaseRowConvert.java [76:134]
static SerializationConverter createExternalConverter(DataType type, ZoneId pipelineZoneId) {
switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR:
return (index, val) -> val.getString(index).toString();
case BOOLEAN:
return (index, val) -> val.getBoolean(index);
case BINARY:
case VARBINARY:
return (index, val) -> val.getBinary(index);
case DECIMAL:
final int decimalPrecision = ((DecimalType) type).getPrecision();
final int decimalScale = ((DecimalType) type).getScale();
return (index, val) ->
val.getDecimal(index, decimalPrecision, decimalScale).toBigDecimal();
case TINYINT:
return (index, val) -> val.getByte(index);
case SMALLINT:
return (index, val) -> val.getShort(index);
case INTEGER:
return (index, val) -> val.getInt(index);
case BIGINT:
return (index, val) -> val.getLong(index);
case FLOAT:
return (index, val) -> val.getFloat(index);
case DOUBLE:
return (index, val) -> val.getDouble(index);
case DATE:
return (index, val) -> Date.valueOf(LocalDate.ofEpochDay(val.getInt(index)));
case TIME_WITHOUT_TIME_ZONE:
return (index, val) ->
Time.valueOf(LocalTime.ofSecondOfDay(val.getInt(index) / 1000));
case TIMESTAMP_WITHOUT_TIME_ZONE:
return (index, val) ->
val.getTimestamp(index, DataTypeChecks.getPrecision(type)).toTimestamp();
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return (index, val) ->
Timestamp.valueOf(
ZonedDateTime.ofInstant(
val.getLocalZonedTimestampData(
index,
DataTypeChecks.getPrecision(type))
.toInstant(),
pipelineZoneId)
.toLocalDateTime());
case TIMESTAMP_WITH_TIME_ZONE:
final int zonedP = ((ZonedTimestampType) type).getPrecision();
return (index, val) -> val.getTimestamp(index, zonedP).toTimestamp();
case ARRAY:
return (index, val) -> convertArrayData(val.getArray(index), type);
case MAP:
return (index, val) -> writeValueAsString(convertMapData(val.getMap(index), type));
case ROW:
return (index, val) ->
writeValueAsString(convertRowData(val, index, type, pipelineZoneId));
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}