public static Set discoverURIs()

in server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java [185:517]


    public static Set<URI> discoverURIs(final String forGroup, final Set<String> schemes, final String host, final int port, long timeout) throws Exception {

        if (timeout < 50) {
            timeout = 50;
        }

        if (null == forGroup || forGroup.isEmpty()) {
            throw new Exception("Specify a valid group or *");
        }

        if (null == schemes || schemes.isEmpty()) {
            throw new Exception("Specify at least one scheme, 'ejbd' for example");
        }

        if (null == host || host.isEmpty()) {
            throw new Exception("Specify a valid host name");
        }

        if (port < 1 || port > 65535) {
            throw new Exception("Specify a valid port between 1 and 65535");
        }

        final InetAddress ia = getAddress(host);

        final byte[] bytes = (MulticastPulseClient.CLIENT + forGroup).getBytes(UTF8);
        final DatagramPacket request = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port));

        final AtomicBoolean running = new AtomicBoolean(true);
        final List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());

        MulticastSocket[] clientSockets = null;

        try {
            clientSockets = MulticastPulseClient.getSockets(ia, port);
            final MulticastSocket[] clientSocketsFinal = clientSockets;

            final Timer timer = new Timer(true);

            final Set<URI> set = new TreeSet<URI>(new Comparator<URI>() {
                @Override
                public int compare(final URI uri1, final URI uri2) {

                    //Ignore server hostname
                    URI u1 = URI.create(uri1.getSchemeSpecificPart());
                    URI u2 = URI.create(uri2.getSchemeSpecificPart());

                    //Ignore scheme (ejb,ejbs,etc.)
                    u1 = URI.create(u1.getSchemeSpecificPart());
                    u2 = URI.create(u2.getSchemeSpecificPart());

                    //Compare URI hosts
                    int i = compare(u1.getHost(), u2.getHost());
                    if (i != 0) {
                        i = uri1.compareTo(uri2);
                    }

                    return i;
                }

                private int compare(final String h1, final String h2) {

                    //Sort by hostname, IPv4, IPv6

                    try {
                        InetAddress address1 = null;
                        InetAddress address2 = null;
                        try {
                            address1 = InetAddress.getByName(h1);
                            address2 = InetAddress.getByName(h2);
                        } catch(final UnknownHostException e) {
                            // no-op
                        }

                        if (isIPv4LiteralAddress(address1)) {
                            if (isIPv6LiteralAddress(address2)) {
                                return -1;
                            }
                        } else if (isIPv6LiteralAddress(address1)) {
                            if (isIPv4LiteralAddress(address2)) {
                                return 1;
                            }
                        } else if (0 != h1.compareTo(h2)) {
                            return -1;
                        }
                    } catch (Exception e) {
                        //Ignore
                    }

                    return h1.compareTo(h2);
                }

                private boolean isIPv4LiteralAddress(final InetAddress val) {
                    return Inet4Address.class.isInstance(val);
                }

                private boolean isIPv6LiteralAddress(final InetAddress val) {
                    return Inet6Address.class.isInstance(val);
                }
            });

            final ReentrantLock setLock = new ReentrantLock();

            //Start threads that listen for multicast packets on our channel.
            //These need to start 'before' we pulse a request.
            final CountDownLatch latchListeners = new CountDownLatch(clientSocketsFinal.length);

            for (final MulticastSocket socket : clientSocketsFinal) {

                futures.add(getExecutorService().submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            final DatagramPacket response = new DatagramPacket(new byte[2048], 2048);
                            latchListeners.countDown();

                            while (running.get()) {
                                try {

                                    socket.receive(response);

                                    final SocketAddress sa = response.getSocketAddress();

                                    if (null != sa && (sa instanceof InetSocketAddress)) {

                                        int len = response.getLength();
                                        if (len > 2048) {

                                            if (log.isLoggable(Level.FINE)) {
                                                log.log(Level.FINE, "Truncating multipulse length {0} to 2048", new Object[]{len});
                                            }
                                            len = 2048;
                                        }

                                        String s = new String(response.getData(), 0, len);

                                        if (s.startsWith(MulticastPulseClient.SERVER)) {

                                            s = (s.replace(MulticastPulseClient.SERVER, ""));
                                            final String group = s.substring(0, s.indexOf(':'));
                                            s = s.substring(group.length() + 1);

                                            if (!"*".equals(forGroup) && !forGroup.equals(group)) {
                                                continue;
                                            }

                                            final String services = s.substring(0, s.lastIndexOf('|'));
                                            s = s.substring(services.length() + 1);

                                            final String[] serviceList = services.split("\\|");
                                            final String[] hosts = s.split(",");

                                            for (final String svc : serviceList) {

                                                if (EMPTY.equals(svc)) {
                                                    continue;
                                                }

                                                final URI serviceUri;
                                                try {
                                                    serviceUri = URI.create(svc);
                                                } catch (Exception e) {
                                                    continue;
                                                }

                                                if (schemes.contains(serviceUri.getScheme())) {

                                                    //Just because multicast was received on this host is does not mean the service is on the same
                                                    //We can however use this to identify an individual machine and group
                                                    final String serverHost = ((InetSocketAddress) response.getSocketAddress()).getAddress().getHostAddress();

                                                    final String serviceHost = serviceUri.getHost();
                                                    if (MulticastPulseClient.isLocalAddress(serviceHost, false)) {
                                                        if (!MulticastPulseClient.isLocalAddress(serverHost, false)) {
                                                            //A local service is only available to a local client
                                                            continue;
                                                        }
                                                    }

                                                    final String svcfull = ("mp-" + serverHost + ":" + group + ":" + svc);

                                                    setLock.lock();

                                                    try {
                                                        if (svcfull.contains("0.0.0.0")) {
                                                            for (final String h : hosts) {
                                                                if (!h.replace("[", "").startsWith("2001:0:")) { //Filter Teredo
                                                                    set.add(URI.create(svcfull.replace("0.0.0.0", ipFormat(h))));
                                                                }
                                                            }
                                                        } else if (svcfull.contains("[::]")) {
                                                            for (final String h : hosts) {
                                                                if (!h.replace("[", "").startsWith("2001:0:")) { //Filter Teredo
                                                                    set.add(URI.create(svcfull.replace("[::]", ipFormat(h))));
                                                                }
                                                            }
                                                        } else {
                                                            //Just add as is
                                                            set.add(URI.create(svcfull));
                                                        }
                                                    } catch (Exception e) {
                                                        //Ignore
                                                    } finally {
                                                        setLock.unlock();
                                                    }
                                                }
                                            }
                                        }
                                    }

                                } catch (Exception e) {
                                    //Ignore
                                }
                            }
                        } finally {
                            try {
                                socket.leaveGroup(ia);
                            } catch (Exception e) {
                                //Ignore
                            }
                            try {
                                socket.close();
                            } catch (Exception e) {
                                //Ignore
                            }
                        }
                    }
                }));
            }

            try {
                //Give listener threads a reasonable amount of time to start
                if (latchListeners.await(clientSocketsFinal.length * 2, TimeUnit.SECONDS)) {

                    //Start pulsing client request every 10ms - This will ensure we have at least 4 client pulses within our minimum timeout
                    //This pulse is designed to tell a listening server to wake up and pulse back a response
                    futures.add(0, getExecutorService().submit(new Runnable() {
                        @Override
                        public void run() {
                            while (running.get()) {
                                //Pulse to listening servers - It is thread safe to use same sockets as send/receive synchronization is only on the packet
                                for (final MulticastSocket socket : clientSocketsFinal) {

                                    if (running.get()) {
                                        try {
                                            socket.send(request);
                                        } catch (Exception e) {
                                            //Ignore
                                        }
                                    } else {
                                        break;
                                    }
                                }

                                if (running.get()) {
                                    try {
                                        Thread.sleep(10);
                                    } catch (InterruptedException e) {
                                        break;
                                    }
                                }
                            }
                        }
                    }));
                } else {
                    timeout = 1;
                }

            } catch (InterruptedException e) {
                //Terminate as quickly as possible
                timeout = 1;
            }

            //Kill the threads after timeout
            timer.schedule(new TimerTask() {
                @Override
                public void run() {

                    running.set(false);

                    try {
                        for (final Future future : futures) {
                            future.cancel(true);
                        }
                    } catch (ConcurrentModificationException e) {
                        //Ignore
                    }

                }
            }, timeout);

            //Wait for threads to complete
            for (final Future future : futures) {
                try {
                    future.get();
                } catch (Exception e) {
                    //Ignore
                }
            }

            setLock.lock();
            try {
                return new TreeSet<URI>(set);
            } finally {
                setLock.unlock();
            }
        } finally {

            //Just to be sure we are clean
            for (final Future future : futures) {
                try {
                    future.cancel(true);
                } catch (Exception e) {
                    //Ignore
                }
            }

            futures.clear();

            for (final MulticastSocket socket : clientSockets) {

                try {
                    socket.leaveGroup(ia);
                } catch (Exception e) {
                    //Ignore
                }
                try {
                    socket.close();
                } catch (Exception e) {
                    //Ignore
                }
            }
        }
    }