in hologres-connector-flink-base/src/main/java/com/alibaba/ververica/connectors/hologres/api/table/RowDataWriter.java [16:188]
static <T> FieldWriter createFieldWriter(
LogicalType fieldType,
int hologresType,
String hologresTypeName,
RowDataWriter<T> rowDataWriter,
int columnIndexInHologresTable,
String arrayDelimiter) {
FieldWriter fieldWriter;
switch (hologresType) {
case Types.CHAR:
case Types.VARCHAR:
fieldWriter =
(obj) -> {
rowDataWriter.writeString((StringData) obj, columnIndexInHologresTable);
};
break;
case Types.BIT:
case Types.BOOLEAN:
fieldWriter =
(obj) -> {
rowDataWriter.writeBoolean((Boolean) obj, columnIndexInHologresTable);
};
break;
case Types.BINARY:
case Types.VARBINARY:
fieldWriter =
(obj) -> {
rowDataWriter.writeBinary((byte[]) obj, columnIndexInHologresTable);
};
break;
case Types.NUMERIC:
case Types.DECIMAL:
int decimalPrecision = LogicalTypeChecks.getPrecision(fieldType);
int decimalScale = LogicalTypeChecks.getScale(fieldType);
fieldWriter =
(obj) -> {
rowDataWriter.writeDecimal(
(DecimalData) obj,
columnIndexInHologresTable,
decimalPrecision,
decimalScale);
};
break;
case Types.TINYINT:
fieldWriter =
(obj) -> {
rowDataWriter.writeByte((Byte) obj, columnIndexInHologresTable);
};
break;
case Types.SMALLINT:
fieldWriter =
(obj) -> {
// SMALLINT should compatible with TINYINT
if (obj instanceof Byte) {
rowDataWriter.writeShort(
((Byte) obj).shortValue(), columnIndexInHologresTable);
} else {
rowDataWriter.writeShort((short) obj, columnIndexInHologresTable);
}
};
break;
case Types.INTEGER:
fieldWriter =
(obj) -> {
rowDataWriter.writeInt((Integer) obj, columnIndexInHologresTable);
};
break;
case Types.DATE:
fieldWriter =
(obj) -> {
rowDataWriter.writeDate((Integer) obj, columnIndexInHologresTable);
};
break;
case Types.BIGINT:
fieldWriter =
(obj) -> {
rowDataWriter.writeLong((Long) obj, columnIndexInHologresTable);
};
break;
case Types.REAL:
case Types.FLOAT:
fieldWriter =
(obj) -> {
rowDataWriter.writeFloat((Float) obj, columnIndexInHologresTable);
};
break;
case Types.DOUBLE:
fieldWriter =
(obj) -> {
rowDataWriter.writeDouble((Double) obj, columnIndexInHologresTable);
};
break;
case Types.TIMESTAMP:
case Types.TIMESTAMP_WITH_TIMEZONE:
boolean isFlinkLTZ =
fieldType
.getTypeRoot()
.equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
if (hologresTypeName.equals("timestamptz")) {
if (isFlinkLTZ) {
// flink TIMESTAMP_LTZ -> holo TIMESTAMPTZ
fieldWriter =
(obj) -> {
rowDataWriter.writeLTZAsTimestampTz(
(TimestampData) obj, columnIndexInHologresTable);
};
} else {
// flink TIMESTAMP -> holo TIMESTAMPTZ
fieldWriter =
(obj) -> {
rowDataWriter.writeTimestampTz(
(TimestampData) obj, columnIndexInHologresTable);
};
}
} else {
if (isFlinkLTZ) {
// flink TIMESTAMP_LTZ -> holo TIMESTAMP
throw new UnsupportedOperationException(
"The hologres connector does not support writing flink timestamp_ltz type to hologres timestamp type, please use hologres timestamp with timezone instead.");
} else {
// flink TIMESTAMP -> holo TIMESTAMP
fieldWriter =
(obj) -> {
rowDataWriter.writeTimestamp(
(TimestampData) obj, columnIndexInHologresTable);
};
}
}
break;
case Types.ARRAY:
if (fieldType.getTypeRoot().equals(LogicalTypeRoot.VARCHAR)) {
fieldWriter =
(obj) -> {
rowDataWriter.writeStringArray(
((String) obj).split(arrayDelimiter),
columnIndexInHologresTable);
};
} else {
fieldWriter =
createArrayFieldWriter(
fieldType,
hologresTypeName,
rowDataWriter,
columnIndexInHologresTable);
}
break;
case Types.OTHER:
// The type check of jdbc copy writer is relatively strict, only supports
// java.lang.String but not BinaryStringData.
if (hologresTypeName.equals("json") || hologresTypeName.equals("jsonb")) {
fieldWriter =
(obj) ->
rowDataWriter.writeString(
(StringData) obj, columnIndexInHologresTable);
} else {
fieldWriter =
(obj) -> rowDataWriter.writeObject(obj, columnIndexInHologresTable);
}
break;
default:
throw new IllegalArgumentException(
String.format(
"Hologres sink does not support data type %s for now", fieldType));
}
return (obj) -> {
if (obj == null) {
rowDataWriter.writeNull(columnIndexInHologresTable);
} else {
fieldWriter.writeValue(obj);
}
};
}