public Transport doCompositeConnect()

in activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java [60:163]


    public Transport doCompositeConnect(URI location) throws Exception {
        URI brokerURI;
        String host;
        Map<String, String> options;
        boolean create = true;
        int waitForStart = -1;
        CompositeData data = URISupport.parseComposite(location);
        if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) {
            brokerURI = data.getComponents()[0];
            CompositeData brokerData = URISupport.parseComposite(brokerURI);
            host = brokerData.getParameters().get("brokerName");
            if (host == null) {
                host = "localhost";
            }
            if (brokerData.getPath() != null) {
                host = brokerData.getPath();
            }
            options = data.getParameters();
            location = new URI("vm://" + host);
        } else {
            // If using the less complex vm://localhost?broker.persistent=true
            // form
            try {
                host = extractHost(location);
                options = URISupport.parseParameters(location);
                String config = options.remove("brokerConfig");
                if (config != null) {
                    brokerURI = new URI(config);
                } else {
                    Map<String, Object> brokerOptions = IntrospectionSupport.extractProperties(options, "broker.");
                    brokerURI = new URI("broker://()/" + host + "?"
                                        + URISupport.createQueryString(brokerOptions));
                }
                if ("false".equals(options.remove("create"))) {
                    create = false;
                }
                String waitForStartString = options.remove("waitForStart");
                if (waitForStartString != null) {
                    waitForStart = Integer.parseInt(waitForStartString);
                }
            } catch (URISyntaxException e1) {
                throw IOExceptionSupport.create(e1);
            }
            location = new URI("vm://" + host);
        }
        if (host == null) {
            host = "localhost";
        }
        VMTransportServer server = SERVERS.get(host);
        // validate the broker is still active
        if (!validateBroker(host) || server == null) {
            BrokerService broker = null;
            // Synchronize on the registry so that multiple concurrent threads
            // doing this do not think that the broker has not been created and
            // cause multiple brokers to be started.
            synchronized (BrokerRegistry.getInstance().getRegistryMutext()) {
                broker = lookupBroker(BrokerRegistry.getInstance(), host, waitForStart);
                if (broker == null) {
                    if (!create) {
                        throw new IOException("Broker named '" + host + "' does not exist.");
                    }
                    try {
                        if (brokerFactoryHandler != null) {
                            broker = brokerFactoryHandler.createBroker(brokerURI);
                        } else {
                            broker = BrokerFactory.createBroker(brokerURI);
                        }
                        broker.start();
                        MDC.put("activemq.broker", broker.getBrokerName());
                    } catch (URISyntaxException e) {
                        throw IOExceptionSupport.create(e);
                    }
                    BROKERS.put(host, broker);
                    BrokerRegistry.getInstance().getRegistryMutext().notifyAll();
                }

                server = SERVERS.get(host);
                if (server == null) {
                    server = (VMTransportServer)bind(location, true);
                    TransportConnector connector = new TransportConnector(server);
                    connector.setBrokerService(broker);
                    connector.setUri(location);
                    connector.setTaskRunnerFactory(broker.getTaskRunnerFactory());
                    connector.start();
                    CONNECTORS.put(host, connector);
                }

            }
        }

        VMTransport vmtransport = server.connect();
        IntrospectionSupport.setProperties(vmtransport.peer, new HashMap<String,String>(options));
        IntrospectionSupport.setProperties(vmtransport, options);
        Transport transport = vmtransport;
        if (vmtransport.isMarshal()) {
            Map<String, String> optionsCopy = new HashMap<String, String>(options);
            transport = new MarshallingTransportFilter(transport, createWireFormat(options),
                                                       createWireFormat(optionsCopy));
        }
        if (!options.isEmpty()) {
            throw new IllegalArgumentException("Invalid connect parameters: " + options);
        }
        return transport;
    }