in broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java [142:284]
public static Map<String,Object> convertWireArgsToModel(final String queueName,
final Map<String, Object> wireArguments,
final Model model,
final Queue.BehaviourOnUnknownDeclareArgument unknownArgumentBehaviour)
{
Map<String,Object> modelArguments = new HashMap<>();
if(wireArguments != null)
{
final ConfiguredObjectTypeRegistry typeRegistry = model.getTypeRegistry();
final List<ConfiguredObjectAttribute<?, ?>> attributeTypes =
new ArrayList<>(typeRegistry.getAttributeTypes(Queue.class).values());
typeRegistry.getTypeSpecialisations(Queue.class)
.forEach(type -> attributeTypes.addAll(typeRegistry.getTypeSpecificAttributes(type)));
final Set<String> wireArgumentNames = new HashSet<>(wireArguments.keySet());
wireArguments.entrySet()
.stream()
.filter(entry -> attributeTypes.stream()
.anyMatch(type -> Objects.equals(entry.getKey(), type.getName())
&& !type.isDerived()))
.forEach(entry -> {
modelArguments.put(entry.getKey(), entry.getValue());
wireArgumentNames.remove(entry.getKey());
});
for(Map.Entry<String,String> entry : ATTRIBUTE_MAPPINGS.entrySet())
{
if(wireArguments.containsKey(entry.getKey()))
{
modelArguments.put(entry.getValue(), wireArguments.get(entry.getKey()));
wireArgumentNames.remove(entry.getKey());
}
}
if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE))
{
wireArgumentNames.remove(QPID_LAST_VALUE_QUEUE);
if (!wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
{
modelArguments.put(LastValueQueue.LVQ_KEY, LastValueQueue.DEFAULT_LVQ_KEY);
}
}
if(wireArguments.containsKey(QPID_POLICY_TYPE))
{
modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.valueOf(String.valueOf(wireArguments.get(QPID_POLICY_TYPE)).toUpperCase()));
}
if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP))
{
wireArgumentNames.remove(QPID_SHARED_MSG_GROUP);
if (SHARED_MSG_GROUP_ARG_VALUE.equals(String.valueOf(wireArguments.get(QPID_SHARED_MSG_GROUP))))
{
modelArguments.put(Queue.MESSAGE_GROUP_TYPE, MessageGroupType.SHARED_GROUPS);
}
}
else if(wireArguments.containsKey(QPID_GROUP_HEADER_KEY))
{
modelArguments.put(Queue.MESSAGE_GROUP_TYPE, MessageGroupType.STANDARD);
if ("JMSXGroupId".equals(wireArguments.get(QPID_GROUP_HEADER_KEY)))
{
modelArguments.remove(Queue.MESSAGE_GROUP_KEY_OVERRIDE);
}
}
if(wireArguments.get(QPID_NO_LOCAL) != null)
{
modelArguments.put(Queue.NO_LOCAL, Boolean.parseBoolean(wireArguments.get(QPID_NO_LOCAL).toString()));
}
if (wireArguments.containsKey(X_QPID_FLOW_RESUME_CAPACITY))
{
wireArgumentNames.remove(X_QPID_FLOW_RESUME_CAPACITY);
if (wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY) != null && wireArguments.get(X_QPID_CAPACITY) != null)
{
double resumeCapacity = Integer.parseInt(wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY).toString());
double maximumCapacity = Integer.parseInt(wireArguments.get(X_QPID_CAPACITY).toString());
if (resumeCapacity > maximumCapacity)
{
throw new ConnectionScopedRuntimeException(
"Flow resume size can't be greater than flow control size");
}
Map<String, String> context = (Map<String, String>) modelArguments.get(Queue.CONTEXT);
if (context == null)
{
context = new HashMap<>();
modelArguments.put(Queue.CONTEXT, context);
}
double ratio = resumeCapacity / maximumCapacity;
context.put(Queue.QUEUE_FLOW_RESUME_LIMIT, String.format("%.2f", ratio * 100.0));
modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL);
}
}
if (wireArguments.containsKey(ALTERNATE_EXCHANGE))
{
wireArgumentNames.remove(ALTERNATE_EXCHANGE);
modelArguments.put(Queue.ALTERNATE_BINDING,
Collections.singletonMap(AlternateBinding.DESTINATION,
wireArguments.get(ALTERNATE_EXCHANGE)));
}
else if (wireArguments.containsKey(X_QPID_DLQ_ENABLED))
{
wireArgumentNames.remove(X_QPID_DLQ_ENABLED);
Object argument = wireArguments.get(X_QPID_DLQ_ENABLED);
if ((argument instanceof Boolean && ((Boolean) argument).booleanValue())
|| (argument instanceof String && Boolean.parseBoolean((String)argument)))
{
modelArguments.put(Queue.ALTERNATE_BINDING,
Collections.singletonMap(AlternateBinding.DESTINATION,
getDeadLetterQueueName(queueName)));
}
}
if(wireArguments.containsKey(X_SINGLE_ACTIVE_CONSUMER))
{
wireArgumentNames.remove(X_SINGLE_ACTIVE_CONSUMER);
Object argument = wireArguments.get(X_SINGLE_ACTIVE_CONSUMER);
if ((argument instanceof Boolean && ((Boolean) argument).booleanValue())
|| (argument instanceof String && Boolean.parseBoolean((String)argument)))
{
modelArguments.putIfAbsent(Queue.MAXIMUM_LIVE_CONSUMERS, 1);
}
}
if (!wireArgumentNames.isEmpty())
{
switch(unknownArgumentBehaviour)
{
case LOG:
LOGGER.warn("Unsupported queue declare argument(s) : {}", String.join(",", wireArgumentNames));
break;
case IGNORE:
break;
case FAIL:
default:
throw new IllegalArgumentException(String.format("Unsupported queue declare argument(s) : %s",
String.join(",", wireArgumentNames)));
}
}
}
return modelArguments;
}