in broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java [625:813]
public void configuredObject(ConfiguredObjectRecord record)
{
if(VIRTUALHOST.equals(record.getType()))
{
record = upgradeRootRecord(record);
Map<String, Object> attributes = new HashMap<>(record.getAttributes());
boolean modified = attributes.remove("queue_deadLetterQueueEnabled") != null;
Object context = attributes.get(CONTEXT);
Map<String,Object> contextMap = null;
if(context instanceof Map)
{
contextMap = new HashMap<>((Map<String,Object>) context);
modified |= contextMap.remove("queue.deadLetterQueueEnabled") != null;
if (modified)
{
attributes.put(CONTEXT, contextMap);
}
}
int brokerStatisticsReportingPeriod = ((Broker) _virtualHostNode.getParent()).getStatisticsReportingPeriod();
if (brokerStatisticsReportingPeriod > 0)
{
attributes.put("statisticsReportingPeriod", brokerStatisticsReportingPeriod);
if (contextMap == null)
{
contextMap = new HashMap<>();
}
contextMap.put("qpid.virtualhost.statisticsReportPattern", "${ancestor:virtualhost:name}: messagesIn=${messagesIn}, bytesIn=${bytesIn:byteunit}, messagesOut=${messagesOut}, bytesOut=${bytesOut:byteunit}");
attributes.put(CONTEXT, contextMap);
modified = true;
}
if (modified)
{
record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), attributes, record.getParents());
getUpdateMap().put(record.getId(), record);
}
}
else if("Binding".equals(record.getType()))
{
BindingRecord binding = new BindingRecord(String.valueOf(record.getAttributes().get("name")),
record.getParents().get("Queue").toString(),
record.getAttributes().get("arguments"));
final UUID exchangeId = record.getParents().get("Exchange");
List<BindingRecord> existingBindings = _exchangeBindings.get(exchangeId);
if(existingBindings == null)
{
existingBindings = new ArrayList<>();
_exchangeBindings.put(exchangeId, existingBindings);
}
existingBindings.add(binding);
getDeleteMap().put(record.getId(), record);
}
else if("Exchange".equals(record.getType()))
{
final UUID exchangeId = record.getId();
_exchanges.put(exchangeId, record);
if(record.getAttributes().containsKey("bindings"))
{
List<BindingRecord> existingBindings = _exchangeBindings.get(exchangeId);
if(existingBindings == null)
{
existingBindings = new ArrayList<>();
_exchangeBindings.put(exchangeId, existingBindings);
}
List<Map<String,Object>> bindingList =
(List<Map<String, Object>>) record.getAttributes().get("bindings");
for(Map<String,Object> existingBinding : bindingList)
{
existingBindings.add(new BindingRecord((String)existingBinding.get("name"),
String.valueOf(existingBinding.get("queue")),
existingBinding.get("arguments")));
}
}
if (record.getAttributes().containsKey("alternateExchange"))
{
_destinationsWithAlternateExchange.add(record.getId());
getUpdateMap().put(record.getId(), record);
}
}
else if("Queue".equals(record.getType()))
{
Map<String, Object> attributes = new HashMap<>(record.getAttributes());
Object queueFlowControlSizeBytes = attributes.remove("queueFlowControlSizeBytes");
Object queueFlowResumeSizeBytes = attributes.remove("queueFlowResumeSizeBytes");
if (queueFlowControlSizeBytes != null)
{
long queueFlowControlSizeBytesValue = convertAttributeValueToLong("queueFlowControlSizeBytes",
queueFlowControlSizeBytes);
if (queueFlowControlSizeBytesValue > 0)
{
if (queueFlowResumeSizeBytes != null)
{
long queueFlowResumeSizeBytesValue =
convertAttributeValueToLong("queueFlowResumeSizeBytes", queueFlowResumeSizeBytes);
double ratio = ((double) queueFlowResumeSizeBytesValue)
/ ((double) queueFlowControlSizeBytesValue);
String flowResumeLimit = String.format("%.2f", ratio * 100.0);
Object context = attributes.get(CONTEXT);
Map<String, String> contextMap;
if (context instanceof Map)
{
contextMap = (Map) context;
}
else
{
contextMap = new HashMap<>();
attributes.put(CONTEXT, contextMap);
}
contextMap.put("queue.queueFlowResumeLimit", flowResumeLimit);
}
attributes.put("overflowPolicy", "PRODUCER_FLOW_CONTROL");
attributes.put("maximumQueueDepthBytes", queueFlowControlSizeBytes);
}
}
boolean addToUpdateMap = false;
if (attributes.containsKey("alternateExchange"))
{
_destinationsWithAlternateExchange.add(record.getId());
addToUpdateMap = true;
}
if(attributes.containsKey("bindings"))
{
_queueBindings.put(String.valueOf(attributes.get("name")),
(List<Map<String, Object>>) attributes.get("bindings"));
attributes.remove("bindings");
}
if(attributes.containsKey("messageGroupKey"))
{
if(attributes.containsKey("messageGroupSharedGroups")
&& convertAttributeValueToBoolean("messageGroupSharedGroups",
attributes.remove("messageGroupSharedGroups")))
{
attributes.put("messageGroupType", "SHARED_GROUPS");
}
else
{
attributes.put("messageGroupType", "STANDARD");
}
Object oldMessageGroupKey = attributes.remove("messageGroupKey");
if (!"JMSXGroupId".equals(oldMessageGroupKey))
{
attributes.put("messageGroupKeyOverride", oldMessageGroupKey);
}
}
else
{
attributes.put("messageGroupType", "NONE");
}
_queues.put(record.getId(), (String) attributes.get("name"));
if (!attributes.equals(new HashMap<>(record.getAttributes())) || addToUpdateMap)
{
getUpdateMap().put(record.getId(),
new ConfiguredObjectRecordImpl(record.getId(),
record.getType(),
attributes,
record.getParents()));
}
}
else if (record.getType().equals("VirtualHostLogger"))
{
Map<String,Object> attributes = new HashMap<>();
attributes.put("name", "statistics-" + record.getAttributes().get("name"));
attributes.put("level", "INFO");
attributes.put("loggerName", "qpid.statistics.*");
attributes.put("type", "NameAndLevel");
final ConfiguredObjectRecord filterRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(),
"VirtualHostLogInclusionRule",
attributes,
Collections.singletonMap("VirtualHostLogger",
record.getId()));
getUpdateMap().put(filterRecord.getId(), filterRecord);
}
}