public static void validateTableSchema()

in paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java [100:229]


    public static void validateTableSchema(TableSchema schema) {
        validateOnlyContainPrimitiveType(schema.fields(), schema.primaryKeys(), "primary key");
        validateOnlyContainPrimitiveType(schema.fields(), schema.partitionKeys(), "partition");

        CoreOptions options = new CoreOptions(schema.options());

        validateBucket(schema, options);

        validateDefaultValues(schema);

        validateStartupMode(options);

        validateFieldsPrefix(schema, options);

        validateSequenceField(schema, options);

        validateSequenceGroup(schema, options);

        ChangelogProducer changelogProducer = options.changelogProducer();
        if (schema.primaryKeys().isEmpty() && changelogProducer != ChangelogProducer.NONE) {
            throw new UnsupportedOperationException(
                    String.format(
                            "Can not set %s on table without primary keys, please define primary keys.",
                            CHANGELOG_PRODUCER.key()));
        }
        if (options.streamingReadOverwrite()
                && (changelogProducer == ChangelogProducer.FULL_COMPACTION
                        || changelogProducer == ChangelogProducer.LOOKUP)) {
            throw new UnsupportedOperationException(
                    String.format(
                            "Cannot set %s to true when changelog producer is %s or %s because it will read duplicated changes.",
                            STREAMING_READ_OVERWRITE.key(),
                            ChangelogProducer.FULL_COMPACTION,
                            ChangelogProducer.LOOKUP));
        }

        checkArgument(
                options.snapshotNumRetainMin() > 0,
                SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
        checkArgument(
                options.snapshotNumRetainMin() <= options.snapshotNumRetainMax(),
                SNAPSHOT_NUM_RETAINED_MIN.key()
                        + " should not be larger than "
                        + SNAPSHOT_NUM_RETAINED_MAX.key());

        checkArgument(
                options.changelogNumRetainMin() > 0,
                CHANGELOG_NUM_RETAINED_MIN.key() + " should be at least 1");
        checkArgument(
                options.changelogNumRetainMin() <= options.changelogNumRetainMax(),
                CHANGELOG_NUM_RETAINED_MIN.key()
                        + " should not be larger than "
                        + CHANGELOG_NUM_RETAINED_MAX.key());

        FileFormat fileFormat =
                FileFormat.fromIdentifier(options.formatType(), new Options(schema.options()));
        fileFormat.validateDataFields(new RowType(schema.fields()));

        // Check column names in schema
        schema.fieldNames()
                .forEach(
                        f -> {
                            checkState(
                                    !SYSTEM_FIELD_NAMES.contains(f),
                                    String.format(
                                            "Field name[%s] in schema cannot be exist in %s",
                                            f, SYSTEM_FIELD_NAMES));
                            checkState(
                                    !f.startsWith(KEY_FIELD_PREFIX),
                                    String.format(
                                            "Field name[%s] in schema cannot start with [%s]",
                                            f, KEY_FIELD_PREFIX));
                        });

        if (schema.primaryKeys().isEmpty() && options.streamingReadOverwrite()) {
            throw new RuntimeException(
                    "Doesn't support streaming read the changes from overwrite when the primary keys are not defined.");
        }

        if (schema.options().containsKey(CoreOptions.PARTITION_EXPIRATION_TIME.key())) {
            if (schema.partitionKeys().isEmpty()) {
                throw new IllegalArgumentException(
                        "Can not set 'partition.expiration-time' for non-partitioned table.");
            }
        }

        String recordLevelTimeField = options.recordLevelTimeField();
        if (recordLevelTimeField != null) {
            Optional<DataField> field =
                    schema.fields().stream()
                            .filter(dataField -> dataField.name().equals(recordLevelTimeField))
                            .findFirst();
            if (!field.isPresent()) {
                throw new IllegalArgumentException(
                        String.format(
                                "Can not find time field %s for record level expire.",
                                recordLevelTimeField));
            }
            DataType dataType = field.get().type();
            if (!(dataType instanceof IntType
                    || dataType instanceof BigIntType
                    || dataType instanceof TimestampType
                    || dataType instanceof LocalZonedTimestampType)) {
                throw new IllegalArgumentException(
                        String.format(
                                "The record level time field type should be one of INT, BIGINT, or TIMESTAMP, but field type is %s.",
                                dataType));
            }
        }

        if (options.mergeEngine() == MergeEngine.FIRST_ROW) {
            if (options.changelogProducer() != ChangelogProducer.LOOKUP
                    && options.changelogProducer() != ChangelogProducer.NONE) {
                throw new IllegalArgumentException(
                        "Only support 'none' and 'lookup' changelog-producer on FIRST_ROW merge engine");
            }
        }

        options.rowkindField()
                .ifPresent(
                        field ->
                                checkArgument(
                                        schema.fieldNames().contains(field),
                                        "Rowkind field: '%s' can not be found in table schema.",
                                        field));

        if (options.deletionVectorsEnabled()) {
            validateForDeletionVectors(options);
        }
    }