in inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksTableRowTransformer.java [123:212]
private Object typeConvertion(LogicalType type, RowData record, int pos) {
if (record.isNullAt(pos)) {
return null;
}
switch (type.getTypeRoot()) {
case BOOLEAN:
return record.getBoolean(pos) ? 1L : 0L;
case TINYINT:
return record.getByte(pos);
case SMALLINT:
return record.getShort(pos);
case INTEGER:
return record.getInt(pos);
case BIGINT:
return record.getLong(pos);
case FLOAT:
return record.getFloat(pos);
case DOUBLE:
return record.getDouble(pos);
case CHAR:
case VARCHAR:
String sValue = record.getString(pos).toString();
if (columns == null) {
return sValue;
}
StarRocksDataType starRocksDataType =
columns.getOrDefault(columnNames[pos], StarRocksDataType.UNKNOWN);
if (starRocksDataType == StarRocksDataType.UNKNOWN) {
return sValue;
}
if ((starRocksDataType == StarRocksDataType.JSON)
&& (sValue.charAt(0) == '{' || sValue.charAt(0) == '[')) {
// The json string need to be converted to a json object, and to the json string
// again via JSON.toJSONString in StarRocksJsonSerializer#serialize. Otherwise,
// the final json string in stream load will not be correct. For example, the received
// string is "{"a": 1, "b": 2}", and if input it to JSON.toJSONString directly, the
// result will be "{\"a\": 1, \"b\": 2}" which will not be recognized as a json in
// StarRocks
try {
return JSON.parse(sValue);
} catch (Throwable t) {
if (!ignoreJsonParseError) {
throw t;
}
return sValue;
}
}
return sValue;
case DATE:
return dateFormatter.format(Date.valueOf(LocalDate.ofEpochDay(record.getInt(pos))));
case TIMESTAMP_WITHOUT_TIME_ZONE:
final int timestampPrecision = ((TimestampType) type).getPrecision();
return record.getTimestamp(pos, timestampPrecision).toLocalDateTime().toString();
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
int localZonedTimestampPrecision = ((LocalZonedTimestampType) type).getPrecision();
return record.getTimestamp(pos, localZonedTimestampPrecision).toLocalDateTime().toString();
case DECIMAL: // for both largeint and decimal
final int decimalPrecision = ((DecimalType) type).getPrecision();
final int decimalScale = ((DecimalType) type).getScale();
return record.getDecimal(pos, decimalPrecision, decimalScale).toBigDecimal();
case BINARY:
final byte[] bts = record.getBinary(pos);
long value = 0;
for (int i = 0; i < bts.length; i++) {
value += (bts[bts.length - i - 1] & 0xffL) << (8 * i);
}
return value;
case ARRAY:
return convertNestedArray(record.getArray(pos), type);
case MAP:
return convertNestedMap(record.getMap(pos), type);
case ROW:
RowType rType = (RowType) type;
Map<String, Object> m = new HashMap<>();
RowData row = record.getRow(pos, rType.getFieldCount());
rType.getFields().parallelStream().forEach(
f -> m.put(f.getName(), typeConvertion(f.getType(), row, rType.getFieldIndex(f.getName()))));
if (columns == null) {
return m;
}
StarRocksDataType rStarRocksDataType =
columns.getOrDefault(columnNames[pos], StarRocksDataType.UNKNOWN);
if (rStarRocksDataType == StarRocksDataType.STRING) {
return JSON.toJSONString(m);
}
return m;
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}