in flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java [323:345]
public static HBaseTableSchema fromDataType(DataType physicalRowType) {
HBaseTableSchema hbaseSchema = new HBaseTableSchema();
RowType rowType = (RowType) physicalRowType.getLogicalType();
for (RowType.RowField field : rowType.getFields()) {
LogicalType fieldType = field.getType();
if (fieldType.getTypeRoot() == LogicalTypeRoot.ROW) {
RowType familyType = (RowType) fieldType;
String familyName = field.getName();
for (RowType.RowField qualifier : familyType.getFields()) {
hbaseSchema.addColumn(
familyName,
qualifier.getName(),
fromLogicalToDataType(qualifier.getType()));
}
} else if (fieldType.getChildren().size() == 0) {
hbaseSchema.setRowKey(field.getName(), fromLogicalToDataType(fieldType));
} else {
throw new IllegalArgumentException(
"Unsupported field type '" + fieldType + "' for HBase.");
}
}
return hbaseSchema;
}