public RowDataFieldsKinesisPartitionKeyGenerator()

in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGenerator.java [111:157]


    public RowDataFieldsKinesisPartitionKeyGenerator(
            RowType physicalType, List<String> partitionKeys, String delimiter) {
        Preconditions.checkNotNull(physicalType, "physicalType");
        Preconditions.checkNotNull(partitionKeys, "partitionKeys");
        Preconditions.checkNotNull(delimiter, "delimiter");
        Preconditions.checkArgument(
                !partitionKeys.isEmpty(),
                "Cannot create a RowDataFieldsKinesisPartitioner for a non-partitioned table");
        Preconditions.checkArgument(
                partitionKeys.size() == new HashSet<>(partitionKeys).size(),
                "The sequence of partition keys cannot contain duplicates");

        List<String> fieldsList = physicalType.getFieldNames();

        List<String> badKeyNames = new ArrayList<>();
        List<String> badKeyTypes = new ArrayList<>();

        for (String fieldName : partitionKeys) {
            int index = fieldsList.indexOf(fieldName);
            if (index < 0) {
                badKeyNames.add(fieldName);
            } else if (!LogicalTypeChecks.hasWellDefinedString(physicalType.getTypeAt(index))) {
                badKeyTypes.add(fieldName);
            }
        }

        Preconditions.checkArgument(
                badKeyNames.size() == 0,
                "The following partition keys are not present in the table: %s",
                String.join(", ", badKeyNames));
        Preconditions.checkArgument(
                badKeyTypes.size() == 0,
                "The following partition keys have types that are not supported by Kinesis: %s",
                String.join(", ", badKeyTypes));

        this.delimiter = delimiter;
        this.fieldNames = partitionKeys;
        this.dynamicFieldGetters = new HashMap<>();
        for (String fieldName : partitionKeys) {
            RowField field = physicalType.getFields().get(fieldsList.indexOf(fieldName));

            RowData.FieldGetter fieldGetter =
                    RowData.createFieldGetter(field.getType(), fieldsList.indexOf(field.getName()));

            this.dynamicFieldGetters.put(fieldName, fieldGetter);
        }
    }