public static Object extractRecord()

in flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/BinaryRecordDataExtractor.java [52:130]


    public static Object extractRecord(Object object, DataType dataType) {
        if (object == null) {
            return "null";
        }
        if (dataType instanceof BinaryType || dataType instanceof VarBinaryType) {
            Preconditions.checkArgument(
                    object instanceof byte[],
                    "Column data of BinaryType and VarBinaryType should be `byte[]`, but was %s",
                    object.getClass().getName());
            return BaseEncoding.base64().encode((byte[]) object);
        } else if (dataType instanceof MapType) {
            Preconditions.checkArgument(
                    object instanceof MapData,
                    "Column data of MapType should be MapData, but was %s",
                    object.getClass().getName());
            MapType mapType = (MapType) dataType;
            MapData mapData = (MapData) object;
            List<?> keyArray =
                    (List<?>)
                            extractRecord(
                                    mapData.keyArray(), DataTypes.ARRAY(mapType.getKeyType()));
            List<?> valueArray =
                    (List<?>)
                            extractRecord(
                                    mapData.valueArray(), DataTypes.ARRAY(mapType.getValueType()));
            Preconditions.checkArgument(
                    keyArray.size() == valueArray.size(),
                    "Malformed MapData: keyArray size (%d) differs from valueArray (%d)",
                    keyArray.size(),
                    valueArray.size());
            StringBuilder sb = new StringBuilder("{");
            for (int i = 0; i < keyArray.size(); i++) {
                sb.append(keyArray.get(i)).append(" -> ").append(valueArray.get(i)).append(", ");
            }
            sb.delete(sb.length() - 2, sb.length());
            return sb.append("}").toString();
        } else if (dataType instanceof ArrayType) {
            Preconditions.checkArgument(
                    object instanceof ArrayData,
                    "Column data of ArrayType should be ArrayData, but was %s",
                    object.getClass().getName());
            ArrayType arrayType = (ArrayType) dataType;
            ArrayData arrayData = (ArrayData) object;
            ArrayData.ElementGetter getter =
                    ArrayData.createElementGetter(arrayType.getElementType());
            List<Object> results = new ArrayList<>();
            for (int i = 0; i < arrayData.size(); i++) {
                results.add(getter.getElementOrNull(arrayData, i));
            }
            return results;
        } else if (dataType instanceof RowType) {
            Preconditions.checkArgument(
                    object instanceof RecordData,
                    "Column data of RowType should be RecordData, but was %s",
                    object.getClass().getName());
            RowType rowType = (RowType) dataType;
            RecordData binaryRecordData = (RecordData) object;
            List<String> fieldNames = rowType.getFieldNames();
            List<DataType> fieldTypes = rowType.getFieldTypes();
            List<RecordData.FieldGetter> fieldGetters =
                    SchemaUtils.createFieldGetters(fieldTypes.toArray(new DataType[0]));
            StringBuilder sb = new StringBuilder("{");
            for (int i = 0; i < rowType.getFieldCount(); i++) {
                sb.append(fieldNames.get(i))
                        .append(": ")
                        .append(fieldTypes.get(i))
                        .append(" -> ")
                        .append(
                                extractRecord(
                                        fieldGetters.get(i).getFieldOrNull(binaryRecordData),
                                        fieldTypes.get(i)))
                        .append(", ");
            }
            sb.delete(sb.length() - 2, sb.length());
            return sb.append("}").toString();
        } else {
            return object.toString();
        }
    }