in paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java [100:229]
public static void validateTableSchema(TableSchema schema) {
validateOnlyContainPrimitiveType(schema.fields(), schema.primaryKeys(), "primary key");
validateOnlyContainPrimitiveType(schema.fields(), schema.partitionKeys(), "partition");
CoreOptions options = new CoreOptions(schema.options());
validateBucket(schema, options);
validateDefaultValues(schema);
validateStartupMode(options);
validateFieldsPrefix(schema, options);
validateSequenceField(schema, options);
validateSequenceGroup(schema, options);
ChangelogProducer changelogProducer = options.changelogProducer();
if (schema.primaryKeys().isEmpty() && changelogProducer != ChangelogProducer.NONE) {
throw new UnsupportedOperationException(
String.format(
"Can not set %s on table without primary keys, please define primary keys.",
CHANGELOG_PRODUCER.key()));
}
if (options.streamingReadOverwrite()
&& (changelogProducer == ChangelogProducer.FULL_COMPACTION
|| changelogProducer == ChangelogProducer.LOOKUP)) {
throw new UnsupportedOperationException(
String.format(
"Cannot set %s to true when changelog producer is %s or %s because it will read duplicated changes.",
STREAMING_READ_OVERWRITE.key(),
ChangelogProducer.FULL_COMPACTION,
ChangelogProducer.LOOKUP));
}
checkArgument(
options.snapshotNumRetainMin() > 0,
SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
checkArgument(
options.snapshotNumRetainMin() <= options.snapshotNumRetainMax(),
SNAPSHOT_NUM_RETAINED_MIN.key()
+ " should not be larger than "
+ SNAPSHOT_NUM_RETAINED_MAX.key());
checkArgument(
options.changelogNumRetainMin() > 0,
CHANGELOG_NUM_RETAINED_MIN.key() + " should be at least 1");
checkArgument(
options.changelogNumRetainMin() <= options.changelogNumRetainMax(),
CHANGELOG_NUM_RETAINED_MIN.key()
+ " should not be larger than "
+ CHANGELOG_NUM_RETAINED_MAX.key());
FileFormat fileFormat =
FileFormat.fromIdentifier(options.formatType(), new Options(schema.options()));
fileFormat.validateDataFields(new RowType(schema.fields()));
// Check column names in schema
schema.fieldNames()
.forEach(
f -> {
checkState(
!SYSTEM_FIELD_NAMES.contains(f),
String.format(
"Field name[%s] in schema cannot be exist in %s",
f, SYSTEM_FIELD_NAMES));
checkState(
!f.startsWith(KEY_FIELD_PREFIX),
String.format(
"Field name[%s] in schema cannot start with [%s]",
f, KEY_FIELD_PREFIX));
});
if (schema.primaryKeys().isEmpty() && options.streamingReadOverwrite()) {
throw new RuntimeException(
"Doesn't support streaming read the changes from overwrite when the primary keys are not defined.");
}
if (schema.options().containsKey(CoreOptions.PARTITION_EXPIRATION_TIME.key())) {
if (schema.partitionKeys().isEmpty()) {
throw new IllegalArgumentException(
"Can not set 'partition.expiration-time' for non-partitioned table.");
}
}
String recordLevelTimeField = options.recordLevelTimeField();
if (recordLevelTimeField != null) {
Optional<DataField> field =
schema.fields().stream()
.filter(dataField -> dataField.name().equals(recordLevelTimeField))
.findFirst();
if (!field.isPresent()) {
throw new IllegalArgumentException(
String.format(
"Can not find time field %s for record level expire.",
recordLevelTimeField));
}
DataType dataType = field.get().type();
if (!(dataType instanceof IntType
|| dataType instanceof BigIntType
|| dataType instanceof TimestampType
|| dataType instanceof LocalZonedTimestampType)) {
throw new IllegalArgumentException(
String.format(
"The record level time field type should be one of INT, BIGINT, or TIMESTAMP, but field type is %s.",
dataType));
}
}
if (options.mergeEngine() == MergeEngine.FIRST_ROW) {
if (options.changelogProducer() != ChangelogProducer.LOOKUP
&& options.changelogProducer() != ChangelogProducer.NONE) {
throw new IllegalArgumentException(
"Only support 'none' and 'lookup' changelog-producer on FIRST_ROW merge engine");
}
}
options.rowkindField()
.ifPresent(
field ->
checkArgument(
schema.fieldNames().contains(field),
"Rowkind field: '%s' can not be found in table schema.",
field));
if (options.deletionVectorsEnabled()) {
validateForDeletionVectors(options);
}
}