in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/utils/StreamingServerRowConverter.java [245:301]
protected StreamingServerSerializationConverter createExternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case INTEGER:
case INTERVAL_YEAR_MONTH:
return (val, fieldIndex, builder) ->
builder.addColumns(DBValue.newBuilder().setInt32Value(val.getInt(fieldIndex)));
case INTERVAL_DAY_TIME:
case BIGINT:
return (val, fieldIndex, builder) ->
builder.addColumns(DBValue.newBuilder().setInt64Value(val.getLong(fieldIndex)));
case FLOAT:
return (val, fieldIndex, builder) ->
builder.addColumns(DBValue.newBuilder().setFloat32Value(val.getFloat(fieldIndex)));
case DOUBLE:
return (val, fieldIndex, builder) ->
builder.addColumns(DBValue.newBuilder().setFloat64Value(val.getDouble(fieldIndex)));
// These type can map to text type in ADBPG
case DECIMAL:
case BOOLEAN:
case TINYINT:
case SMALLINT:
case DATE:
case TIMESTAMP_WITH_TIME_ZONE:
case CHAR:
case VARCHAR:
return (val, fieldIndex, builder) ->
builder.addColumns(DBValue.newBuilder().setStringValue(val.getString(fieldIndex).toString()));
case BINARY:
case VARBINARY:
return (val, fieldIndex, builder) ->
builder.addColumns(DBValue.newBuilder().setBytesValue(ByteString.copyFrom(val.getBinary(fieldIndex))));
case TIME_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return (val, fieldIndex, builder) ->
builder.addColumns(
DBValue.newBuilder().setTimeStampValue(
com.google.protobuf.Timestamp.newBuilder().setSeconds(
val.getTimestamp(
fieldIndex,
((TimestampType) type).getPrecision()).toTimestamp().getTime()
).setNanos(
val.getTimestamp(
fieldIndex,
((TimestampType) type).getPrecision()).toTimestamp().getNanos()
)
)
);
// Does not support
case ARRAY:
case MAP:
case MULTISET:
case ROW:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}