in flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java [362:418]
private static FieldEncoder createFieldEncoder(LogicalType fieldType) {
// ordered by type root definition
switch (fieldType.getTypeRoot()) {
case CHAR:
case VARCHAR:
// get the underlying UTF-8 bytes
return (row, pos) -> row.getString(pos).toBytes();
case BOOLEAN:
return (row, pos) -> Bytes.toBytes(row.getBoolean(pos));
case BINARY:
case VARBINARY:
return RowData::getBinary;
case DECIMAL:
return createDecimalEncoder((DecimalType) fieldType);
case TINYINT:
return (row, pos) -> new byte[] {row.getByte(pos)};
case SMALLINT:
return (row, pos) -> Bytes.toBytes(row.getShort(pos));
case INTEGER:
case DATE:
case INTERVAL_YEAR_MONTH:
return (row, pos) -> Bytes.toBytes(row.getInt(pos));
case TIME_WITHOUT_TIME_ZONE:
final int timePrecision = getPrecision(fieldType);
if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) {
throw new UnsupportedOperationException(
String.format(
"The precision %s of TIME type is out of the range [%s, %s] supported by "
+ "HBase connector",
timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION));
}
return (row, pos) -> Bytes.toBytes(row.getInt(pos));
case BIGINT:
case INTERVAL_DAY_TIME:
return (row, pos) -> Bytes.toBytes(row.getLong(pos));
case FLOAT:
return (row, pos) -> Bytes.toBytes(row.getFloat(pos));
case DOUBLE:
return (row, pos) -> Bytes.toBytes(row.getDouble(pos));
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final int timestampPrecision = getPrecision(fieldType);
if (timestampPrecision < MIN_TIMESTAMP_PRECISION
|| timestampPrecision > MAX_TIMESTAMP_PRECISION) {
throw new UnsupportedOperationException(
String.format(
"The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by "
+ "HBase connector",
timestampPrecision,
MIN_TIMESTAMP_PRECISION,
MAX_TIMESTAMP_PRECISION));
}
return createTimestampEncoder(timestampPrecision);
default:
throw new UnsupportedOperationException("Unsupported type: " + fieldType);
}
}