in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/table/RowDataFieldsKinesisPartitioner.java [118:166]
public RowDataFieldsKinesisPartitioner(CatalogTable table, String delimiter) {
Preconditions.checkNotNull(table, "table");
Preconditions.checkNotNull(delimiter, "delimiter");
Preconditions.checkArgument(
table.isPartitioned(),
"Cannot create a RowDataFieldsKinesisPartitioner for a non-partitioned table");
Preconditions.checkArgument(
table.getPartitionKeys().size() == new HashSet<>(table.getPartitionKeys()).size(),
"The sequence of partition keys cannot contain duplicates");
TableSchema schema = table.getSchema();
List<String> schemaFieldsList = Arrays.asList(schema.getFieldNames());
List<String> badKeyNames = new ArrayList<>();
List<String> badKeyTypes = new ArrayList<>();
for (String fieldName : table.getPartitionKeys()) {
Optional<DataType> dataType = schema.getFieldDataType(fieldName);
if (!dataType.isPresent()) {
badKeyNames.add(fieldName);
} else if (!hasWellDefinedString(dataType.get().getLogicalType())) {
badKeyTypes.add(fieldName);
}
}
Preconditions.checkArgument(
badKeyNames.size() == 0,
"The following partition keys are not present in the table: %s",
String.join(", ", badKeyNames));
Preconditions.checkArgument(
badKeyTypes.size() == 0,
"The following partition keys have types that are not supported by Kinesis: %s",
String.join(", ", badKeyTypes));
this.delimiter = delimiter;
this.fieldNames = table.getPartitionKeys();
this.dynamicFieldGetters = new HashMap<>();
for (String fieldName : table.getPartitionKeys()) {
TableColumn column = schema
.getTableColumn(fieldName)
.orElseThrow(() -> new RuntimeException("Unexpected field column " + fieldName));
RowData.FieldGetter fieldGetter = RowData.createFieldGetter(
column.getType().getLogicalType(),
schemaFieldsList.indexOf(column.getName()));
this.dynamicFieldGetters.put(fieldName, fieldGetter);
}
}