in flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java [79:121]
public HBaseSerde(HBaseTableSchema hbaseSchema, final String nullStringLiteral) {
this.families = hbaseSchema.getFamilyKeys();
this.rowkeyIndex = hbaseSchema.getRowKeyIndex();
LogicalType rowkeyType =
hbaseSchema.getRowKeyDataType().map(DataType::getLogicalType).orElse(null);
// field length need take row key into account if it exists.
if (rowkeyIndex != -1 && rowkeyType != null) {
this.fieldLength = families.length + 1;
this.keyEncoder = createFieldEncoder(rowkeyType);
this.keyDecoder = createFieldDecoder(rowkeyType);
} else {
this.fieldLength = families.length;
this.keyEncoder = null;
this.keyDecoder = null;
}
this.nullStringBytes = nullStringLiteral.getBytes(StandardCharsets.UTF_8);
// prepare output rows
this.reusedRow = new GenericRowData(fieldLength);
this.reusedFamilyRows = new GenericRowData[families.length];
this.qualifiers = new byte[families.length][][];
this.qualifierEncoders = new FieldEncoder[families.length][];
this.qualifierDecoders = new FieldDecoder[families.length][];
String[] familyNames = hbaseSchema.getFamilyNames();
for (int f = 0; f < families.length; f++) {
this.qualifiers[f] = hbaseSchema.getQualifierKeys(familyNames[f]);
DataType[] dataTypes = hbaseSchema.getQualifierDataTypes(familyNames[f]);
this.qualifierEncoders[f] =
Arrays.stream(dataTypes)
.map(DataType::getLogicalType)
.map(t -> createNullableFieldEncoder(t, nullStringBytes))
.toArray(FieldEncoder[]::new);
this.qualifierDecoders[f] =
Arrays.stream(dataTypes)
.map(DataType::getLogicalType)
.map(t -> createNullableFieldDecoder(t, nullStringBytes))
.toArray(FieldDecoder[]::new);
this.reusedFamilyRows[f] = new GenericRowData(dataTypes.length);
}
this.rowWithRowKey = new GenericRowData(1);
}