public RowDataFieldsKinesisPartitioner()

in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/table/RowDataFieldsKinesisPartitioner.java [118:166]


	public RowDataFieldsKinesisPartitioner(CatalogTable table, String delimiter) {
		Preconditions.checkNotNull(table, "table");
		Preconditions.checkNotNull(delimiter, "delimiter");
		Preconditions.checkArgument(
			table.isPartitioned(),
			"Cannot create a RowDataFieldsKinesisPartitioner for a non-partitioned table");
		Preconditions.checkArgument(
			table.getPartitionKeys().size() == new HashSet<>(table.getPartitionKeys()).size(),
			"The sequence of partition keys cannot contain duplicates");

		TableSchema schema = table.getSchema();
		List<String> schemaFieldsList = Arrays.asList(schema.getFieldNames());

		List<String> badKeyNames = new ArrayList<>();
		List<String> badKeyTypes = new ArrayList<>();

		for (String fieldName : table.getPartitionKeys()) {
			Optional<DataType> dataType = schema.getFieldDataType(fieldName);
			if (!dataType.isPresent()) {
				badKeyNames.add(fieldName);
			} else if (!hasWellDefinedString(dataType.get().getLogicalType())) {
				badKeyTypes.add(fieldName);
			}
		}

		Preconditions.checkArgument(
			badKeyNames.size() == 0,
			"The following partition keys are not present in the table: %s",
			String.join(", ", badKeyNames));
		Preconditions.checkArgument(
			badKeyTypes.size() == 0,
			"The following partition keys have types that are not supported by Kinesis: %s",
			String.join(", ", badKeyTypes));

		this.delimiter = delimiter;
		this.fieldNames = table.getPartitionKeys();
		this.dynamicFieldGetters = new HashMap<>();
		for (String fieldName : table.getPartitionKeys()) {
			TableColumn column = schema
				.getTableColumn(fieldName)
				.orElseThrow(() -> new RuntimeException("Unexpected field column " + fieldName));

			RowData.FieldGetter fieldGetter = RowData.createFieldGetter(
				column.getType().getLogicalType(),
				schemaFieldsList.indexOf(column.getName()));

			this.dynamicFieldGetters.put(fieldName, fieldGetter);
		}
	}