public static Map convertWireArgsToModel()

in broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java [142:284]


    public static Map<String,Object> convertWireArgsToModel(final String queueName,
                                                            final Map<String, Object> wireArguments,
                                                            final Model model,
                                                            final Queue.BehaviourOnUnknownDeclareArgument unknownArgumentBehaviour)
    {
        Map<String,Object> modelArguments = new HashMap<>();
        if(wireArguments != null)
        {
            final ConfiguredObjectTypeRegistry typeRegistry = model.getTypeRegistry();
            final List<ConfiguredObjectAttribute<?, ?>> attributeTypes =
                    new ArrayList<>(typeRegistry.getAttributeTypes(Queue.class).values());
            typeRegistry.getTypeSpecialisations(Queue.class)
                        .forEach(type -> attributeTypes.addAll(typeRegistry.getTypeSpecificAttributes(type)));

            final Set<String> wireArgumentNames = new HashSet<>(wireArguments.keySet());
            wireArguments.entrySet()
                         .stream()
                         .filter(entry -> attributeTypes.stream()
                                                        .anyMatch(type -> Objects.equals(entry.getKey(), type.getName())
                                                                          && !type.isDerived()))
                         .forEach(entry -> {
                             modelArguments.put(entry.getKey(), entry.getValue());
                             wireArgumentNames.remove(entry.getKey());
                         });

            for(Map.Entry<String,String> entry : ATTRIBUTE_MAPPINGS.entrySet())
            {
                if(wireArguments.containsKey(entry.getKey()))
                {
                    modelArguments.put(entry.getValue(), wireArguments.get(entry.getKey()));
                    wireArgumentNames.remove(entry.getKey());
                }
            }
            if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE))
            {
                wireArgumentNames.remove(QPID_LAST_VALUE_QUEUE);
                if (!wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
                {
                    modelArguments.put(LastValueQueue.LVQ_KEY, LastValueQueue.DEFAULT_LVQ_KEY);
                }
            }
            if(wireArguments.containsKey(QPID_POLICY_TYPE))
            {
                modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.valueOf(String.valueOf(wireArguments.get(QPID_POLICY_TYPE)).toUpperCase()));
            }

            if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP))
            {
                wireArgumentNames.remove(QPID_SHARED_MSG_GROUP);
                if (SHARED_MSG_GROUP_ARG_VALUE.equals(String.valueOf(wireArguments.get(QPID_SHARED_MSG_GROUP))))
                {
                    modelArguments.put(Queue.MESSAGE_GROUP_TYPE, MessageGroupType.SHARED_GROUPS);
                }
            }
            else if(wireArguments.containsKey(QPID_GROUP_HEADER_KEY))
            {
                modelArguments.put(Queue.MESSAGE_GROUP_TYPE, MessageGroupType.STANDARD);
                if ("JMSXGroupId".equals(wireArguments.get(QPID_GROUP_HEADER_KEY)))
                {
                    modelArguments.remove(Queue.MESSAGE_GROUP_KEY_OVERRIDE);
                }
            }


            if(wireArguments.get(QPID_NO_LOCAL) != null)
            {
                modelArguments.put(Queue.NO_LOCAL, Boolean.parseBoolean(wireArguments.get(QPID_NO_LOCAL).toString()));
            }

            if (wireArguments.containsKey(X_QPID_FLOW_RESUME_CAPACITY))
            {
                wireArgumentNames.remove(X_QPID_FLOW_RESUME_CAPACITY);
                if (wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY) != null && wireArguments.get(X_QPID_CAPACITY) != null)
                {
                    double resumeCapacity = Integer.parseInt(wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY).toString());
                    double maximumCapacity = Integer.parseInt(wireArguments.get(X_QPID_CAPACITY).toString());
                    if (resumeCapacity > maximumCapacity)
                    {
                        throw new ConnectionScopedRuntimeException(
                                "Flow resume size can't be greater than flow control size");
                    }
                    Map<String, String> context = (Map<String, String>) modelArguments.get(Queue.CONTEXT);
                    if (context == null)
                    {
                        context = new HashMap<>();
                        modelArguments.put(Queue.CONTEXT, context);
                    }
                    double ratio = resumeCapacity / maximumCapacity;
                    context.put(Queue.QUEUE_FLOW_RESUME_LIMIT, String.format("%.2f", ratio * 100.0));
                    modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL);
                }
            }

            if (wireArguments.containsKey(ALTERNATE_EXCHANGE))
            {
                wireArgumentNames.remove(ALTERNATE_EXCHANGE);
                modelArguments.put(Queue.ALTERNATE_BINDING,
                                   Collections.singletonMap(AlternateBinding.DESTINATION,
                                                            wireArguments.get(ALTERNATE_EXCHANGE)));
            }
            else if (wireArguments.containsKey(X_QPID_DLQ_ENABLED))
            {
                wireArgumentNames.remove(X_QPID_DLQ_ENABLED);
                Object argument = wireArguments.get(X_QPID_DLQ_ENABLED);
                if ((argument instanceof Boolean && ((Boolean) argument).booleanValue())
                    || (argument instanceof String && Boolean.parseBoolean((String)argument)))
                {
                    modelArguments.put(Queue.ALTERNATE_BINDING,
                                       Collections.singletonMap(AlternateBinding.DESTINATION,
                                                                getDeadLetterQueueName(queueName)));
                }
            }

            if(wireArguments.containsKey(X_SINGLE_ACTIVE_CONSUMER))
            {
                wireArgumentNames.remove(X_SINGLE_ACTIVE_CONSUMER);
                Object argument = wireArguments.get(X_SINGLE_ACTIVE_CONSUMER);
                if ((argument instanceof Boolean && ((Boolean) argument).booleanValue())
                    || (argument instanceof String && Boolean.parseBoolean((String)argument)))
                {
                    modelArguments.putIfAbsent(Queue.MAXIMUM_LIVE_CONSUMERS, 1);
                }
            }

            if (!wireArgumentNames.isEmpty())
            {

                switch(unknownArgumentBehaviour)
                {
                    case LOG:
                        LOGGER.warn("Unsupported queue declare argument(s) : {}", String.join(",", wireArgumentNames));
                        break;
                    case IGNORE:
                        break;
                    case FAIL:
                    default:
                        throw new IllegalArgumentException(String.format("Unsupported queue declare argument(s) : %s",
                                                                         String.join(",", wireArgumentNames)));
                }
            }
        }
        return modelArguments;
    }