public static SerializableFunction createKeyExtractor()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoKeyExtractor.java [88:129]


    public static SerializableFunction<RowData, BsonValue> createKeyExtractor(
            ResolvedSchema resolvedSchema) {

        Optional<UniqueConstraint> primaryKey = resolvedSchema.getPrimaryKey();
        int[] primaryKeyIndexes = resolvedSchema.getPrimaryKeyIndexes();
        Optional<Column> reservedId = resolvedSchema.getColumn(RESERVED_ID);

        // Primary key is not declared and reserved _id is not present.
        if (!primaryKey.isPresent() && !reservedId.isPresent()) {
            return APPEND_ONLY_KEY_EXTRACTOR;
        }

        if (reservedId.isPresent()) {
            // Ambiguous keys being used due to the presence of an _id field.
            if (!primaryKey.isPresent()
                    || isCompoundPrimaryKey(primaryKeyIndexes)
                    || !primaryKeyContainsReservedId(primaryKey.get())) {
                throw new IllegalArgumentException(
                        "Ambiguous keys being used due to the presence of an _id field. "
                                + "Either use the _id column as the key, or rename the _id column.");
            }
        }

        DataType primaryKeyType;
        if (isCompoundPrimaryKey(primaryKeyIndexes)) {
            DataType physicalRowDataType = resolvedSchema.toPhysicalRowDataType();
            primaryKeyType = Projection.of(primaryKeyIndexes).project(physicalRowDataType);
        } else {
            int primaryKeyIndex = primaryKeyIndexes[0];
            Optional<Column> column = resolvedSchema.getColumn(primaryKeyIndex);
            if (!column.isPresent()) {
                throw new IllegalStateException(
                        String.format(
                                "No primary key column found with index '%s'.", primaryKeyIndex));
            }
            primaryKeyType = column.get().getDataType();
        }

        MongoValidationUtils.validatePrimaryKey(primaryKeyType);

        return new MongoKeyExtractor(primaryKeyType.getLogicalType(), primaryKeyIndexes);
    }