public Keyspaces apply()

in src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java [96:234]


    public Keyspaces apply(ClusterMetadata metadata)
    {
        if (ifNotExists && orReplace)
            throw ire("Cannot use both 'OR REPLACE' and 'IF NOT EXISTS' directives");

        if (!FunctionName.isNameValid(aggregateName))
            throw ire("Aggregate name '%s' is invalid", aggregateName);

        rawArgumentTypes.stream()
                        .filter(raw -> !raw.isImplicitlyFrozen() && raw.isFrozen())
                        .findFirst()
                        .ifPresent(t -> { throw ire("Argument '%s' cannot be frozen; remove frozen<> modifier from '%s'", t, t); });

        if (!rawStateType.isImplicitlyFrozen() && rawStateType.isFrozen())
            throw ire("State type '%s' cannot be frozen; remove frozen<> modifier from '%s'", rawStateType, rawStateType);

        Keyspaces schema = metadata.schema.getKeyspaces();
        KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
        if (null == keyspace)
            throw ire("Keyspace '%s' doesn't exist", keyspaceName);

        /*
         * Resolve the state function
         */

        List<AbstractType<?>> argumentTypes =
            rawArgumentTypes.stream()
                            .map(t -> t.prepare(keyspaceName, keyspace.types).getType().udfType())
                            .collect(toList());
        AbstractType<?> stateType = rawStateType.prepare(keyspaceName, keyspace.types).getType().udfType();
        List<AbstractType<?>> stateFunctionArguments = Lists.newArrayList(concat(singleton(stateType), argumentTypes));

        UserFunction stateFunction =
            keyspace.userFunctions
                    .find(stateFunctionName, stateFunctionArguments)
                    .orElseThrow(() -> ire("State function %s doesn't exist", stateFunctionString()));

        if (stateFunction.isAggregate())
            throw ire("State function %s isn't a scalar function", stateFunctionString());

        if (!stateFunction.returnType().equals(stateType))
        {
            throw ire("State function %s return type must be the same as the first argument type - check STYPE, argument and return types",
                      stateFunctionString());
        }

        /*
         * Resolve the final function and return type
         */

        UserFunction finalFunction = null;
        AbstractType<?> returnType = stateFunction.returnType();

        if (null != finalFunctionName)
        {
            finalFunction = keyspace.userFunctions.find(finalFunctionName, singletonList(stateType)).orElse(null);
            if (null == finalFunction)
                throw ire("Final function %s doesn't exist", finalFunctionString());

            if (finalFunction.isAggregate())
                throw ire("Final function %s isn't a scalar function", finalFunctionString());

            // override return type with that of the final function
            returnType = finalFunction.returnType();
        }

        /*
         * Validate initial condition
         */

        ByteBuffer initialValue = null;
        if (null != rawInitialValue)
        {
            String term = rawInitialValue.toString();
            initialValue = Term.asBytes(keyspaceName, term, stateType);

            if (null != initialValue)
            {
                try
                {
                    stateType.validate(initialValue);
                }
                catch (MarshalException e)
                {
                    throw ire("Invalid value for INITCOND of type %s", stateType.asCQL3Type());
                }
            }

            // Converts initcond to a CQL literal and parse it back to avoid another CASSANDRA-11064
            String initialValueString = stateType.asCQL3Type().toCQLLiteral(initialValue);
            if (!Objects.equal(initialValue, stateType.asCQL3Type().fromCQLLiteral(initialValueString)))
                throw new AssertionError(String.format("CQL literal '%s' (from type %s) parsed with a different value", initialValueString, stateType.asCQL3Type()));

            if (Constants.NULL_LITERAL != rawInitialValue && isNullOrEmpty(stateType, initialValue))
                throw ire("INITCOND must not be empty for all types except TEXT, ASCII, BLOB");
        }

        if (!((UDFunction) stateFunction).isCalledOnNullInput() && null == initialValue)
        {
            throw ire("Cannot create aggregate '%s' without INITCOND because state function %s does not accept 'null' arguments",
                      aggregateName,
                      stateFunctionName);
        }

        /*
         * Create or replace
         */

        UDAggregate aggregate =
            new UDAggregate(new FunctionName(keyspaceName, aggregateName),
                            argumentTypes,
                            returnType,
                            (ScalarFunction) stateFunction,
                            (ScalarFunction) finalFunction,
                            initialValue);

        UserFunction existingAggregate = keyspace.userFunctions.find(aggregate.name(), argumentTypes).orElse(null);
        if (null != existingAggregate)
        {
            if (!existingAggregate.isAggregate())
                throw ire("Aggregate '%s' cannot replace a function", aggregateName);

            if (ifNotExists)
                return schema;

            if (!orReplace)
                throw ire("Aggregate '%s' already exists", aggregateName);

            if (!returnType.isCompatibleWith(existingAggregate.returnType()))
            {
                throw ire("Cannot replace aggregate '%s', the new return type %s isn't compatible with the return type %s of existing function",
                          aggregateName,
                          returnType.asCQL3Type(),
                          existingAggregate.returnType().asCQL3Type());
            }
        }

        return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.userFunctions.withAddedOrUpdated(aggregate)));
    }