in activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java [60:163]
public Transport doCompositeConnect(URI location) throws Exception {
URI brokerURI;
String host;
Map<String, String> options;
boolean create = true;
int waitForStart = -1;
CompositeData data = URISupport.parseComposite(location);
if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) {
brokerURI = data.getComponents()[0];
CompositeData brokerData = URISupport.parseComposite(brokerURI);
host = brokerData.getParameters().get("brokerName");
if (host == null) {
host = "localhost";
}
if (brokerData.getPath() != null) {
host = brokerData.getPath();
}
options = data.getParameters();
location = new URI("vm://" + host);
} else {
// If using the less complex vm://localhost?broker.persistent=true
// form
try {
host = extractHost(location);
options = URISupport.parseParameters(location);
String config = options.remove("brokerConfig");
if (config != null) {
brokerURI = new URI(config);
} else {
Map<String, Object> brokerOptions = IntrospectionSupport.extractProperties(options, "broker.");
brokerURI = new URI("broker://()/" + host + "?"
+ URISupport.createQueryString(brokerOptions));
}
if ("false".equals(options.remove("create"))) {
create = false;
}
String waitForStartString = options.remove("waitForStart");
if (waitForStartString != null) {
waitForStart = Integer.parseInt(waitForStartString);
}
} catch (URISyntaxException e1) {
throw IOExceptionSupport.create(e1);
}
location = new URI("vm://" + host);
}
if (host == null) {
host = "localhost";
}
VMTransportServer server = SERVERS.get(host);
// validate the broker is still active
if (!validateBroker(host) || server == null) {
BrokerService broker = null;
// Synchronize on the registry so that multiple concurrent threads
// doing this do not think that the broker has not been created and
// cause multiple brokers to be started.
synchronized (BrokerRegistry.getInstance().getRegistryMutext()) {
broker = lookupBroker(BrokerRegistry.getInstance(), host, waitForStart);
if (broker == null) {
if (!create) {
throw new IOException("Broker named '" + host + "' does not exist.");
}
try {
if (brokerFactoryHandler != null) {
broker = brokerFactoryHandler.createBroker(brokerURI);
} else {
broker = BrokerFactory.createBroker(brokerURI);
}
broker.start();
MDC.put("activemq.broker", broker.getBrokerName());
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
BROKERS.put(host, broker);
BrokerRegistry.getInstance().getRegistryMutext().notifyAll();
}
server = SERVERS.get(host);
if (server == null) {
server = (VMTransportServer)bind(location, true);
TransportConnector connector = new TransportConnector(server);
connector.setBrokerService(broker);
connector.setUri(location);
connector.setTaskRunnerFactory(broker.getTaskRunnerFactory());
connector.start();
CONNECTORS.put(host, connector);
}
}
}
VMTransport vmtransport = server.connect();
IntrospectionSupport.setProperties(vmtransport.peer, new HashMap<String,String>(options));
IntrospectionSupport.setProperties(vmtransport, options);
Transport transport = vmtransport;
if (vmtransport.isMarshal()) {
Map<String, String> optionsCopy = new HashMap<String, String>(options);
transport = new MarshallingTransportFilter(transport, createWireFormat(options),
createWireFormat(optionsCopy));
}
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect parameters: " + options);
}
return transport;
}