in flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/BinaryRecordDataExtractor.java [52:130]
public static Object extractRecord(Object object, DataType dataType) {
if (object == null) {
return "null";
}
if (dataType instanceof BinaryType || dataType instanceof VarBinaryType) {
Preconditions.checkArgument(
object instanceof byte[],
"Column data of BinaryType and VarBinaryType should be `byte[]`, but was %s",
object.getClass().getName());
return BaseEncoding.base64().encode((byte[]) object);
} else if (dataType instanceof MapType) {
Preconditions.checkArgument(
object instanceof MapData,
"Column data of MapType should be MapData, but was %s",
object.getClass().getName());
MapType mapType = (MapType) dataType;
MapData mapData = (MapData) object;
List<?> keyArray =
(List<?>)
extractRecord(
mapData.keyArray(), DataTypes.ARRAY(mapType.getKeyType()));
List<?> valueArray =
(List<?>)
extractRecord(
mapData.valueArray(), DataTypes.ARRAY(mapType.getValueType()));
Preconditions.checkArgument(
keyArray.size() == valueArray.size(),
"Malformed MapData: keyArray size (%d) differs from valueArray (%d)",
keyArray.size(),
valueArray.size());
StringBuilder sb = new StringBuilder("{");
for (int i = 0; i < keyArray.size(); i++) {
sb.append(keyArray.get(i)).append(" -> ").append(valueArray.get(i)).append(", ");
}
sb.delete(sb.length() - 2, sb.length());
return sb.append("}").toString();
} else if (dataType instanceof ArrayType) {
Preconditions.checkArgument(
object instanceof ArrayData,
"Column data of ArrayType should be ArrayData, but was %s",
object.getClass().getName());
ArrayType arrayType = (ArrayType) dataType;
ArrayData arrayData = (ArrayData) object;
ArrayData.ElementGetter getter =
ArrayData.createElementGetter(arrayType.getElementType());
List<Object> results = new ArrayList<>();
for (int i = 0; i < arrayData.size(); i++) {
results.add(getter.getElementOrNull(arrayData, i));
}
return results;
} else if (dataType instanceof RowType) {
Preconditions.checkArgument(
object instanceof RecordData,
"Column data of RowType should be RecordData, but was %s",
object.getClass().getName());
RowType rowType = (RowType) dataType;
RecordData binaryRecordData = (RecordData) object;
List<String> fieldNames = rowType.getFieldNames();
List<DataType> fieldTypes = rowType.getFieldTypes();
List<RecordData.FieldGetter> fieldGetters =
SchemaUtils.createFieldGetters(fieldTypes.toArray(new DataType[0]));
StringBuilder sb = new StringBuilder("{");
for (int i = 0; i < rowType.getFieldCount(); i++) {
sb.append(fieldNames.get(i))
.append(": ")
.append(fieldTypes.get(i))
.append(" -> ")
.append(
extractRecord(
fieldGetters.get(i).getFieldOrNull(binaryRecordData),
fieldTypes.get(i)))
.append(", ");
}
sb.delete(sb.length() - 2, sb.length());
return sb.append("}").toString();
} else {
return object.toString();
}
}