private Object typeConvertion()

in inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksTableRowTransformer.java [123:212]


    private Object typeConvertion(LogicalType type, RowData record, int pos) {
        if (record.isNullAt(pos)) {
            return null;
        }
        switch (type.getTypeRoot()) {
            case BOOLEAN:
                return record.getBoolean(pos) ? 1L : 0L;
            case TINYINT:
                return record.getByte(pos);
            case SMALLINT:
                return record.getShort(pos);
            case INTEGER:
                return record.getInt(pos);
            case BIGINT:
                return record.getLong(pos);
            case FLOAT:
                return record.getFloat(pos);
            case DOUBLE:
                return record.getDouble(pos);
            case CHAR:
            case VARCHAR:
                String sValue = record.getString(pos).toString();
                if (columns == null) {
                    return sValue;
                }
                StarRocksDataType starRocksDataType =
                        columns.getOrDefault(columnNames[pos], StarRocksDataType.UNKNOWN);
                if (starRocksDataType == StarRocksDataType.UNKNOWN) {
                    return sValue;
                }
                if ((starRocksDataType == StarRocksDataType.JSON)
                        && (sValue.charAt(0) == '{' || sValue.charAt(0) == '[')) {
                    // The json string need to be converted to a json object, and to the json string
                    // again via JSON.toJSONString in StarRocksJsonSerializer#serialize. Otherwise,
                    // the final json string in stream load will not be correct. For example, the received
                    // string is "{"a": 1, "b": 2}", and if input it to JSON.toJSONString directly, the
                    // result will be "{\"a\": 1, \"b\": 2}" which will not be recognized as a json in
                    // StarRocks
                    try {
                        return JSON.parse(sValue);
                    } catch (Throwable t) {
                        if (!ignoreJsonParseError) {
                            throw t;
                        }
                        return sValue;
                    }
                }
                return sValue;
            case DATE:
                return dateFormatter.format(Date.valueOf(LocalDate.ofEpochDay(record.getInt(pos))));
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                final int timestampPrecision = ((TimestampType) type).getPrecision();
                return record.getTimestamp(pos, timestampPrecision).toLocalDateTime().toString();
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                int localZonedTimestampPrecision = ((LocalZonedTimestampType) type).getPrecision();
                return record.getTimestamp(pos, localZonedTimestampPrecision).toLocalDateTime().toString();
            case DECIMAL: // for both largeint and decimal
                final int decimalPrecision = ((DecimalType) type).getPrecision();
                final int decimalScale = ((DecimalType) type).getScale();
                return record.getDecimal(pos, decimalPrecision, decimalScale).toBigDecimal();
            case BINARY:
                final byte[] bts = record.getBinary(pos);
                long value = 0;
                for (int i = 0; i < bts.length; i++) {
                    value += (bts[bts.length - i - 1] & 0xffL) << (8 * i);
                }
                return value;
            case ARRAY:
                return convertNestedArray(record.getArray(pos), type);
            case MAP:
                return convertNestedMap(record.getMap(pos), type);
            case ROW:
                RowType rType = (RowType) type;
                Map<String, Object> m = new HashMap<>();
                RowData row = record.getRow(pos, rType.getFieldCount());
                rType.getFields().parallelStream().forEach(
                        f -> m.put(f.getName(), typeConvertion(f.getType(), row, rType.getFieldIndex(f.getName()))));
                if (columns == null) {
                    return m;
                }
                StarRocksDataType rStarRocksDataType =
                        columns.getOrDefault(columnNames[pos], StarRocksDataType.UNKNOWN);
                if (rStarRocksDataType == StarRocksDataType.STRING) {
                    return JSON.toJSONString(m);
                }
                return m;
            default:
                throw new UnsupportedOperationException("Unsupported type:" + type);
        }
    }