in src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java [96:234]
public Keyspaces apply(ClusterMetadata metadata)
{
if (ifNotExists && orReplace)
throw ire("Cannot use both 'OR REPLACE' and 'IF NOT EXISTS' directives");
if (!FunctionName.isNameValid(aggregateName))
throw ire("Aggregate name '%s' is invalid", aggregateName);
rawArgumentTypes.stream()
.filter(raw -> !raw.isImplicitlyFrozen() && raw.isFrozen())
.findFirst()
.ifPresent(t -> { throw ire("Argument '%s' cannot be frozen; remove frozen<> modifier from '%s'", t, t); });
if (!rawStateType.isImplicitlyFrozen() && rawStateType.isFrozen())
throw ire("State type '%s' cannot be frozen; remove frozen<> modifier from '%s'", rawStateType, rawStateType);
Keyspaces schema = metadata.schema.getKeyspaces();
KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
if (null == keyspace)
throw ire("Keyspace '%s' doesn't exist", keyspaceName);
/*
* Resolve the state function
*/
List<AbstractType<?>> argumentTypes =
rawArgumentTypes.stream()
.map(t -> t.prepare(keyspaceName, keyspace.types).getType().udfType())
.collect(toList());
AbstractType<?> stateType = rawStateType.prepare(keyspaceName, keyspace.types).getType().udfType();
List<AbstractType<?>> stateFunctionArguments = Lists.newArrayList(concat(singleton(stateType), argumentTypes));
UserFunction stateFunction =
keyspace.userFunctions
.find(stateFunctionName, stateFunctionArguments)
.orElseThrow(() -> ire("State function %s doesn't exist", stateFunctionString()));
if (stateFunction.isAggregate())
throw ire("State function %s isn't a scalar function", stateFunctionString());
if (!stateFunction.returnType().equals(stateType))
{
throw ire("State function %s return type must be the same as the first argument type - check STYPE, argument and return types",
stateFunctionString());
}
/*
* Resolve the final function and return type
*/
UserFunction finalFunction = null;
AbstractType<?> returnType = stateFunction.returnType();
if (null != finalFunctionName)
{
finalFunction = keyspace.userFunctions.find(finalFunctionName, singletonList(stateType)).orElse(null);
if (null == finalFunction)
throw ire("Final function %s doesn't exist", finalFunctionString());
if (finalFunction.isAggregate())
throw ire("Final function %s isn't a scalar function", finalFunctionString());
// override return type with that of the final function
returnType = finalFunction.returnType();
}
/*
* Validate initial condition
*/
ByteBuffer initialValue = null;
if (null != rawInitialValue)
{
String term = rawInitialValue.toString();
initialValue = Term.asBytes(keyspaceName, term, stateType);
if (null != initialValue)
{
try
{
stateType.validate(initialValue);
}
catch (MarshalException e)
{
throw ire("Invalid value for INITCOND of type %s", stateType.asCQL3Type());
}
}
// Converts initcond to a CQL literal and parse it back to avoid another CASSANDRA-11064
String initialValueString = stateType.asCQL3Type().toCQLLiteral(initialValue);
if (!Objects.equal(initialValue, stateType.asCQL3Type().fromCQLLiteral(initialValueString)))
throw new AssertionError(String.format("CQL literal '%s' (from type %s) parsed with a different value", initialValueString, stateType.asCQL3Type()));
if (Constants.NULL_LITERAL != rawInitialValue && isNullOrEmpty(stateType, initialValue))
throw ire("INITCOND must not be empty for all types except TEXT, ASCII, BLOB");
}
if (!((UDFunction) stateFunction).isCalledOnNullInput() && null == initialValue)
{
throw ire("Cannot create aggregate '%s' without INITCOND because state function %s does not accept 'null' arguments",
aggregateName,
stateFunctionName);
}
/*
* Create or replace
*/
UDAggregate aggregate =
new UDAggregate(new FunctionName(keyspaceName, aggregateName),
argumentTypes,
returnType,
(ScalarFunction) stateFunction,
(ScalarFunction) finalFunction,
initialValue);
UserFunction existingAggregate = keyspace.userFunctions.find(aggregate.name(), argumentTypes).orElse(null);
if (null != existingAggregate)
{
if (!existingAggregate.isAggregate())
throw ire("Aggregate '%s' cannot replace a function", aggregateName);
if (ifNotExists)
return schema;
if (!orReplace)
throw ire("Aggregate '%s' already exists", aggregateName);
if (!returnType.isCompatibleWith(existingAggregate.returnType()))
{
throw ire("Cannot replace aggregate '%s', the new return type %s isn't compatible with the return type %s of existing function",
aggregateName,
returnType.asCQL3Type(),
existingAggregate.returnType().asCQL3Type());
}
}
return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.userFunctions.withAddedOrUpdated(aggregate)));
}