in flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java [264:304]
public DataType convertToDataType() {
String[] familyNames = getFamilyNames();
if (rowKeyInfo != null) {
String[] fieldNames = new String[familyNames.length + 1];
DataType[] fieldTypes = new DataType[familyNames.length + 1];
for (int i = 0; i < fieldNames.length; i++) {
if (i == rowKeyInfo.rowKeyIndex) {
fieldNames[i] = rowKeyInfo.rowKeyName;
fieldTypes[i] = rowKeyInfo.rowKeyType;
} else {
int familyIndex = i < rowKeyInfo.rowKeyIndex ? i : i - 1;
String family = familyNames[familyIndex];
fieldNames[i] = family;
fieldTypes[i] =
getRowDataType(
getQualifierNames(family), getQualifierDataTypes(family));
}
}
return DataTypes.ROW(
Streams.zip(
Arrays.stream(fieldNames),
Arrays.stream(fieldTypes),
DataTypes::FIELD)
.toArray(DataTypes.Field[]::new));
} else {
String[] fieldNames = new String[familyNames.length];
DataType[] fieldTypes = new DataType[familyNames.length];
for (int i = 0; i < fieldNames.length; i++) {
String family = familyNames[i];
fieldNames[i] = family;
fieldTypes[i] =
getRowDataType(getQualifierNames(family), getQualifierDataTypes(family));
}
return DataTypes.ROW(
Streams.zip(
Arrays.stream(fieldNames),
Arrays.stream(fieldTypes),
DataTypes::FIELD)
.toArray(DataTypes.Field[]::new));
}
}