in paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java [77:196]
public static void validateTableSchema(TableSchema schema) {
validatePrimaryKeysType(schema.fields(), schema.primaryKeys());
CoreOptions options = new CoreOptions(schema.options());
validateDefaultValues(schema);
validateStartupMode(options);
ChangelogProducer changelogProducer = options.changelogProducer();
if (options.writeMode() == WriteMode.APPEND_ONLY
&& changelogProducer != ChangelogProducer.NONE) {
throw new UnsupportedOperationException(
String.format(
"Can not set the %s to %s and %s at the same time.",
WRITE_MODE.key(), APPEND_ONLY, CHANGELOG_PRODUCER.key()));
}
checkArgument(
options.snapshotNumRetainMin() > 0,
SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
checkArgument(
options.snapshotNumRetainMin() <= options.snapshotNumRetainMax(),
SNAPSHOT_NUM_RETAINED_MIN.key()
+ " should not be larger than "
+ SNAPSHOT_NUM_RETAINED_MAX.key());
// Only changelog tables with primary keys support full compaction or lookup
// changelog producer
if (options.writeMode() == WriteMode.CHANGE_LOG) {
switch (changelogProducer) {
case FULL_COMPACTION:
case LOOKUP:
if (schema.primaryKeys().isEmpty()) {
throw new UnsupportedOperationException(
"Changelog table with "
+ changelogProducer
+ " must have primary keys");
}
break;
default:
}
}
// Get the format type here which will try to convert string value to {@Code
// FileFormatType}. If the string value is illegal, an exception will be thrown.
CoreOptions.FileFormatType fileFormatType = options.formatType();
FileFormat fileFormat =
FileFormat.fromIdentifier(fileFormatType.name(), new Options(schema.options()));
fileFormat.validateDataFields(new RowType(schema.fields()));
// Check column names in schema
schema.fieldNames()
.forEach(
f -> {
checkState(
!SYSTEM_FIELD_NAMES.contains(f),
String.format(
"Field name[%s] in schema cannot be exist in %s",
f, SYSTEM_FIELD_NAMES));
checkState(
!f.startsWith(KEY_FIELD_PREFIX),
String.format(
"Field name[%s] in schema cannot start with [%s]",
f, KEY_FIELD_PREFIX));
});
// Cannot define any primary key in an append-only table.
if (!schema.primaryKeys().isEmpty() && Objects.equals(APPEND_ONLY, options.writeMode())) {
throw new RuntimeException(
"Cannot define any primary key in an append-only table. Set 'write-mode'='change-log' if "
+ "still want to keep the primary key definition.");
}
if (options.bucket() == -1 && options.toMap().get(BUCKET_KEY.key()) != null) {
throw new RuntimeException(
"Cannot define 'bucket-key' in unaware or dynamic bucket mode.");
}
if (schema.primaryKeys().isEmpty() && options.streamingReadOverwrite()) {
throw new RuntimeException(
"Doesn't support streaming read the changes from overwrite when the primary keys are not defined.");
}
if (schema.options().containsKey(CoreOptions.PARTITION_EXPIRATION_TIME.key())) {
if (schema.partitionKeys().isEmpty()) {
throw new IllegalArgumentException(
"Can not set 'partition.expiration-time' for non-partitioned table.");
}
}
Optional<String> sequenceField = options.sequenceField();
sequenceField.ifPresent(
field ->
checkArgument(
schema.fieldNames().contains(field),
"Nonexistent sequence field: '%s'",
field));
CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW) {
if (sequenceField.isPresent()) {
throw new IllegalArgumentException(
"Do not support use sequence field on FIRST_MERGE merge engine");
}
if (changelogProducer != ChangelogProducer.LOOKUP) {
throw new IllegalArgumentException(
"Only support 'lookup' changelog-producer on FIRST_MERGE merge engine");
}
}
if (schema.crossPartitionUpdate() && options.bucket() != -1) {
throw new IllegalArgumentException(
String.format(
"You should use dynamic bucket (bucket = -1) mode in cross partition update case "
+ "(Primary key constraint %s not include all partition fields %s).",
schema.primaryKeys(), schema.partitionKeys()));
}
}