public static void validateTableSchema()

in paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java [77:196]


    public static void validateTableSchema(TableSchema schema) {
        validatePrimaryKeysType(schema.fields(), schema.primaryKeys());

        CoreOptions options = new CoreOptions(schema.options());

        validateDefaultValues(schema);

        validateStartupMode(options);

        ChangelogProducer changelogProducer = options.changelogProducer();
        if (options.writeMode() == WriteMode.APPEND_ONLY
                && changelogProducer != ChangelogProducer.NONE) {
            throw new UnsupportedOperationException(
                    String.format(
                            "Can not set the %s to %s and %s at the same time.",
                            WRITE_MODE.key(), APPEND_ONLY, CHANGELOG_PRODUCER.key()));
        }

        checkArgument(
                options.snapshotNumRetainMin() > 0,
                SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
        checkArgument(
                options.snapshotNumRetainMin() <= options.snapshotNumRetainMax(),
                SNAPSHOT_NUM_RETAINED_MIN.key()
                        + " should not be larger than "
                        + SNAPSHOT_NUM_RETAINED_MAX.key());

        // Only changelog tables with primary keys support full compaction or lookup
        // changelog producer
        if (options.writeMode() == WriteMode.CHANGE_LOG) {
            switch (changelogProducer) {
                case FULL_COMPACTION:
                case LOOKUP:
                    if (schema.primaryKeys().isEmpty()) {
                        throw new UnsupportedOperationException(
                                "Changelog table with "
                                        + changelogProducer
                                        + " must have primary keys");
                    }
                    break;
                default:
            }
        }

        // Get the format type here which will try to convert string value to {@Code
        // FileFormatType}. If the string value is illegal, an exception will be thrown.
        CoreOptions.FileFormatType fileFormatType = options.formatType();
        FileFormat fileFormat =
                FileFormat.fromIdentifier(fileFormatType.name(), new Options(schema.options()));
        fileFormat.validateDataFields(new RowType(schema.fields()));

        // Check column names in schema
        schema.fieldNames()
                .forEach(
                        f -> {
                            checkState(
                                    !SYSTEM_FIELD_NAMES.contains(f),
                                    String.format(
                                            "Field name[%s] in schema cannot be exist in %s",
                                            f, SYSTEM_FIELD_NAMES));
                            checkState(
                                    !f.startsWith(KEY_FIELD_PREFIX),
                                    String.format(
                                            "Field name[%s] in schema cannot start with [%s]",
                                            f, KEY_FIELD_PREFIX));
                        });

        // Cannot define any primary key in an append-only table.
        if (!schema.primaryKeys().isEmpty() && Objects.equals(APPEND_ONLY, options.writeMode())) {
            throw new RuntimeException(
                    "Cannot define any primary key in an append-only table. Set 'write-mode'='change-log' if "
                            + "still want to keep the primary key definition.");
        }

        if (options.bucket() == -1 && options.toMap().get(BUCKET_KEY.key()) != null) {
            throw new RuntimeException(
                    "Cannot define 'bucket-key' in unaware or dynamic bucket mode.");
        }

        if (schema.primaryKeys().isEmpty() && options.streamingReadOverwrite()) {
            throw new RuntimeException(
                    "Doesn't support streaming read the changes from overwrite when the primary keys are not defined.");
        }

        if (schema.options().containsKey(CoreOptions.PARTITION_EXPIRATION_TIME.key())) {
            if (schema.partitionKeys().isEmpty()) {
                throw new IllegalArgumentException(
                        "Can not set 'partition.expiration-time' for non-partitioned table.");
            }
        }

        Optional<String> sequenceField = options.sequenceField();
        sequenceField.ifPresent(
                field ->
                        checkArgument(
                                schema.fieldNames().contains(field),
                                "Nonexistent sequence field: '%s'",
                                field));

        CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
        if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW) {
            if (sequenceField.isPresent()) {
                throw new IllegalArgumentException(
                        "Do not support use sequence field on FIRST_MERGE merge engine");
            }

            if (changelogProducer != ChangelogProducer.LOOKUP) {
                throw new IllegalArgumentException(
                        "Only support 'lookup' changelog-producer on FIRST_MERGE merge engine");
            }
        }

        if (schema.crossPartitionUpdate() && options.bucket() != -1) {
            throw new IllegalArgumentException(
                    String.format(
                            "You should use dynamic bucket (bucket = -1) mode in cross partition update case "
                                    + "(Primary key constraint %s not include all partition fields %s).",
                            schema.primaryKeys(), schema.partitionKeys()));
        }
    }