in broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCConfigurationStore.java [417:580]
private void upgradeFromV7(ConfiguredObject<?> parent) throws SQLException
{
Map<String, String> defaultExchanges = Map.of("amq.direct", "direct",
"amq.topic", "topic",
"amq.fanout", "fanout",
"amq.match", "headers");
Connection connection = newConnection();
try
{
String virtualHostName = parent.getName();
UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName);
String stringifiedConfigVersion = "0." + DEFAULT_CONFIG_VERSION;
boolean tableExists = tableExists(getConfigurationVersionTableName(), connection);
if(tableExists)
{
int configVersion = getConfigVersion(connection);
getLogger().debug("Upgrader read existing config version {}", configVersion);
stringifiedConfigVersion = "0." + configVersion;
}
Map<String, Object> virtualHostAttributes = new HashMap<>();
virtualHostAttributes.put("modelVersion", stringifiedConfigVersion);
virtualHostAttributes.put("name", virtualHostName);
ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(virtualHostId, "VirtualHost", virtualHostAttributes);
insertConfiguredObject(virtualHostRecord, connection);
getLogger().debug("Upgrader created VirtualHost configuration entry with config version {}", stringifiedConfigVersion);
Map<UUID,Map<String,Object>> bindingsToUpdate = new HashMap<>();
List<UUID> others = new ArrayList<>();
final ObjectMapper objectMapper = ConfiguredObjectJacksonModule.newObjectMapper(true);
PreparedStatement stmt = connection.prepareStatement("SELECT id, object_type, attributes FROM " + getConfiguredObjectsTableName());
try
{
try (ResultSet rs = stmt.executeQuery())
{
while (rs.next())
{
UUID id = UUID.fromString(rs.getString(1));
String objectType = rs.getString(2);
if ("VirtualHost".equals(objectType))
{
continue;
}
Map<String, Object> attributes = objectMapper.readValue(getBlobAsString(rs, 3), Map.class);
if (objectType.endsWith("Binding"))
{
bindingsToUpdate.put(id, attributes);
}
else
{
if (objectType.equals("Exchange"))
{
defaultExchanges.remove((String) attributes.get("name"));
}
others.add(id);
}
}
}
catch (IOException e)
{
throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
}
}
finally
{
stmt.close();
}
stmt = connection.prepareStatement("INSERT INTO " + getConfiguredObjectHierarchyTableName()
+ " ( child_id, parent_type, parent_id) VALUES (?,?,?)");
try
{
for (UUID id : others)
{
stmt.setString(1, id.toString());
stmt.setString(2, "VirtualHost");
stmt.setString(3, virtualHostId.toString());
stmt.execute();
}
for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet())
{
stmt.setString(1, bindingEntry.getKey().toString());
stmt.setString(2,"Queue");
stmt.setString(3, bindingEntry.getValue().remove("queue").toString());
stmt.execute();
stmt.setString(1, bindingEntry.getKey().toString());
stmt.setString(2,"Exchange");
stmt.setString(3, bindingEntry.getValue().remove("exchange").toString());
stmt.execute();
}
}
finally
{
stmt.close();
}
for (Map.Entry<String, String> defaultExchangeEntry : defaultExchanges.entrySet())
{
UUID id = UUIDGenerator.generateExchangeUUID(defaultExchangeEntry.getKey(), virtualHostName);
Map<String, Object> exchangeAttributes = new HashMap<>();
exchangeAttributes.put("name", defaultExchangeEntry.getKey());
exchangeAttributes.put("type", defaultExchangeEntry.getValue());
exchangeAttributes.put("lifetimePolicy", "PERMANENT");
Map<String, UUID> parents = Map.of("VirtualHost", virtualHostRecord.getId());
ConfiguredObjectRecord exchangeRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(id, "Exchange", exchangeAttributes, parents);
insertConfiguredObject(exchangeRecord, connection);
}
stmt = connection.prepareStatement("UPDATE " + getConfiguredObjectsTableName()
+ " set object_type =?, attributes = ? where id = ?");
try
{
for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet())
{
stmt.setString(1, "Binding");
byte[] attributesAsBytes = objectMapper.writeValueAsBytes(bindingEntry.getValue());
ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
stmt.setBinaryStream(2, bis, attributesAsBytes.length);
stmt.setString(3, bindingEntry.getKey().toString());
stmt.execute();
}
}
catch (IOException e)
{
throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
}
finally
{
stmt.close();
}
if (tableExists)
{
dropConfigVersionTable(connection);
}
connection.commit();
}
catch(SQLException e)
{
try
{
connection.rollback();
}
catch(SQLException re)
{
}
throw e;
}
finally
{
connection.close();
}
}