public void start()

in core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java [241:292]


    public void start() throws JBIException {
        if (started.compareAndSet(false, true)) {
            LOGGER.debug(broker.getContainer().getName() + ": Starting jms flow");
            super.start();
            try {
                Session broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                Topic broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
                broadcastConsumer = broadcastSession.createConsumer(broadcastTopic, null, true);
                broadcastConsumer.setMessageListener(new MessageListener() {
                    public void onMessage(Message message) {
                        try {
                            Object obj = ((ObjectMessage) message).getObject();
                            if (obj instanceof EndpointEvent) {
                                EndpointEvent event = (EndpointEvent) obj;
                                String container = ((InternalEndpoint) event.getEndpoint()).getComponentNameSpace()
                                        .getContainerName();
                                if (!getBroker().getContainer().getName().equals(container)) {
                                    if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_REGISTERED) {
                                        onRemoteEndpointRegistered(event);
                                    } else if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED) {
                                        onRemoteEndpointUnregistered(event);
                                    }
                                }
                            }
                        } catch (Exception e) {
                            LOGGER.error("Error processing incoming broadcast message", e);
                        }
                    }
                });

                // Start queue consumers for all components
                for (ComponentMBeanImpl cmp : broker.getContainer().getRegistry().getComponents()) {
                    if (cmp.isStarted()) {
                        onComponentStarted(new ComponentEvent(cmp, ComponentEvent.COMPONENT_STARTED));
                    }
                }
                // Start queue consumers for all endpoints
                ServiceEndpoint[] endpoints = broker.getContainer().getRegistry().getEndpointsForInterface(null);
                for (int i = 0; i < endpoints.length; i++) {
                    if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
                        onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
                                EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), false);
                    }
                }

                startConsumerMonitor();
            } catch (JMSException e) {
                JBIException jbiEx = new JBIException("JMSException caught in start: " + e.getMessage());
                throw jbiEx;
            }
        }
    }