public RowData convert()

in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/connector/converter/RowResultRowDataConverter.java [36:103]


    public RowData convert(RowResult row) {
        Schema schema = row.getColumnProjection();
        GenericRowData values = new GenericRowData(schema.getColumnCount());
        schema.getColumns()
                .forEach(
                        column -> {
                            String name = column.getName();
                            Type type = column.getType();
                            int pos = schema.getColumnIndex(name);
                            if (Objects.isNull(type)) {
                                throw new IllegalArgumentException("columnName:" + name);
                            }
                            if (row.isNull(name)) {
                                return;
                            }
                            switch (type) {
                                case DECIMAL:
                                    BigDecimal decimal = row.getDecimal(name);
                                    values.setField(
                                            pos,
                                            DecimalData.fromBigDecimal(
                                                    decimal, decimal.precision(), decimal.scale()));
                                    break;
                                case UNIXTIME_MICROS:
                                    values.setField(
                                            pos,
                                            TimestampData.fromTimestamp(row.getTimestamp(name)));
                                    break;
                                case DOUBLE:
                                    values.setField(pos, row.getDouble(name));
                                    break;
                                case STRING:
                                    Object value = row.getObject(name);
                                    values.setField(
                                            pos,
                                            StringData.fromString(
                                                    Objects.nonNull(value)
                                                            ? value.toString()
                                                            : ""));
                                    break;
                                case BINARY:
                                    values.setField(pos, row.getBinary(name));
                                    break;
                                case FLOAT:
                                    values.setField(pos, row.getFloat(name));
                                    break;
                                case INT64:
                                    values.setField(pos, row.getLong(name));
                                    break;
                                case INT32:
                                case INT16:
                                case INT8:
                                    values.setField(pos, row.getInt(name));
                                    break;
                                case BOOL:
                                    values.setField(pos, row.getBoolean(name));
                                    break;
                                default:
                                    throw new IllegalArgumentException(
                                            "columnName:"
                                                    + name
                                                    + ",type:"
                                                    + type.getName()
                                                    + "not support!");
                            }
                        });
        return values;
    }