in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGenerator.java [111:157]
public RowDataFieldsKinesisPartitionKeyGenerator(
RowType physicalType, List<String> partitionKeys, String delimiter) {
Preconditions.checkNotNull(physicalType, "physicalType");
Preconditions.checkNotNull(partitionKeys, "partitionKeys");
Preconditions.checkNotNull(delimiter, "delimiter");
Preconditions.checkArgument(
!partitionKeys.isEmpty(),
"Cannot create a RowDataFieldsKinesisPartitioner for a non-partitioned table");
Preconditions.checkArgument(
partitionKeys.size() == new HashSet<>(partitionKeys).size(),
"The sequence of partition keys cannot contain duplicates");
List<String> fieldsList = physicalType.getFieldNames();
List<String> badKeyNames = new ArrayList<>();
List<String> badKeyTypes = new ArrayList<>();
for (String fieldName : partitionKeys) {
int index = fieldsList.indexOf(fieldName);
if (index < 0) {
badKeyNames.add(fieldName);
} else if (!LogicalTypeChecks.hasWellDefinedString(physicalType.getTypeAt(index))) {
badKeyTypes.add(fieldName);
}
}
Preconditions.checkArgument(
badKeyNames.size() == 0,
"The following partition keys are not present in the table: %s",
String.join(", ", badKeyNames));
Preconditions.checkArgument(
badKeyTypes.size() == 0,
"The following partition keys have types that are not supported by Kinesis: %s",
String.join(", ", badKeyTypes));
this.delimiter = delimiter;
this.fieldNames = partitionKeys;
this.dynamicFieldGetters = new HashMap<>();
for (String fieldName : partitionKeys) {
RowField field = physicalType.getFields().get(fieldsList.indexOf(fieldName));
RowData.FieldGetter fieldGetter =
RowData.createFieldGetter(field.getType(), fieldsList.indexOf(field.getName()));
this.dynamicFieldGetters.put(fieldName, fieldGetter);
}
}