in server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java [185:517]
public static Set<URI> discoverURIs(final String forGroup, final Set<String> schemes, final String host, final int port, long timeout) throws Exception {
if (timeout < 50) {
timeout = 50;
}
if (null == forGroup || forGroup.isEmpty()) {
throw new Exception("Specify a valid group or *");
}
if (null == schemes || schemes.isEmpty()) {
throw new Exception("Specify at least one scheme, 'ejbd' for example");
}
if (null == host || host.isEmpty()) {
throw new Exception("Specify a valid host name");
}
if (port < 1 || port > 65535) {
throw new Exception("Specify a valid port between 1 and 65535");
}
final InetAddress ia = getAddress(host);
final byte[] bytes = (MulticastPulseClient.CLIENT + forGroup).getBytes(UTF8);
final DatagramPacket request = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port));
final AtomicBoolean running = new AtomicBoolean(true);
final List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
MulticastSocket[] clientSockets = null;
try {
clientSockets = MulticastPulseClient.getSockets(ia, port);
final MulticastSocket[] clientSocketsFinal = clientSockets;
final Timer timer = new Timer(true);
final Set<URI> set = new TreeSet<URI>(new Comparator<URI>() {
@Override
public int compare(final URI uri1, final URI uri2) {
//Ignore server hostname
URI u1 = URI.create(uri1.getSchemeSpecificPart());
URI u2 = URI.create(uri2.getSchemeSpecificPart());
//Ignore scheme (ejb,ejbs,etc.)
u1 = URI.create(u1.getSchemeSpecificPart());
u2 = URI.create(u2.getSchemeSpecificPart());
//Compare URI hosts
int i = compare(u1.getHost(), u2.getHost());
if (i != 0) {
i = uri1.compareTo(uri2);
}
return i;
}
private int compare(final String h1, final String h2) {
//Sort by hostname, IPv4, IPv6
try {
InetAddress address1 = null;
InetAddress address2 = null;
try {
address1 = InetAddress.getByName(h1);
address2 = InetAddress.getByName(h2);
} catch(final UnknownHostException e) {
// no-op
}
if (isIPv4LiteralAddress(address1)) {
if (isIPv6LiteralAddress(address2)) {
return -1;
}
} else if (isIPv6LiteralAddress(address1)) {
if (isIPv4LiteralAddress(address2)) {
return 1;
}
} else if (0 != h1.compareTo(h2)) {
return -1;
}
} catch (Exception e) {
//Ignore
}
return h1.compareTo(h2);
}
private boolean isIPv4LiteralAddress(final InetAddress val) {
return Inet4Address.class.isInstance(val);
}
private boolean isIPv6LiteralAddress(final InetAddress val) {
return Inet6Address.class.isInstance(val);
}
});
final ReentrantLock setLock = new ReentrantLock();
//Start threads that listen for multicast packets on our channel.
//These need to start 'before' we pulse a request.
final CountDownLatch latchListeners = new CountDownLatch(clientSocketsFinal.length);
for (final MulticastSocket socket : clientSocketsFinal) {
futures.add(getExecutorService().submit(new Runnable() {
@Override
public void run() {
try {
final DatagramPacket response = new DatagramPacket(new byte[2048], 2048);
latchListeners.countDown();
while (running.get()) {
try {
socket.receive(response);
final SocketAddress sa = response.getSocketAddress();
if (null != sa && (sa instanceof InetSocketAddress)) {
int len = response.getLength();
if (len > 2048) {
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "Truncating multipulse length {0} to 2048", new Object[]{len});
}
len = 2048;
}
String s = new String(response.getData(), 0, len);
if (s.startsWith(MulticastPulseClient.SERVER)) {
s = (s.replace(MulticastPulseClient.SERVER, ""));
final String group = s.substring(0, s.indexOf(':'));
s = s.substring(group.length() + 1);
if (!"*".equals(forGroup) && !forGroup.equals(group)) {
continue;
}
final String services = s.substring(0, s.lastIndexOf('|'));
s = s.substring(services.length() + 1);
final String[] serviceList = services.split("\\|");
final String[] hosts = s.split(",");
for (final String svc : serviceList) {
if (EMPTY.equals(svc)) {
continue;
}
final URI serviceUri;
try {
serviceUri = URI.create(svc);
} catch (Exception e) {
continue;
}
if (schemes.contains(serviceUri.getScheme())) {
//Just because multicast was received on this host is does not mean the service is on the same
//We can however use this to identify an individual machine and group
final String serverHost = ((InetSocketAddress) response.getSocketAddress()).getAddress().getHostAddress();
final String serviceHost = serviceUri.getHost();
if (MulticastPulseClient.isLocalAddress(serviceHost, false)) {
if (!MulticastPulseClient.isLocalAddress(serverHost, false)) {
//A local service is only available to a local client
continue;
}
}
final String svcfull = ("mp-" + serverHost + ":" + group + ":" + svc);
setLock.lock();
try {
if (svcfull.contains("0.0.0.0")) {
for (final String h : hosts) {
if (!h.replace("[", "").startsWith("2001:0:")) { //Filter Teredo
set.add(URI.create(svcfull.replace("0.0.0.0", ipFormat(h))));
}
}
} else if (svcfull.contains("[::]")) {
for (final String h : hosts) {
if (!h.replace("[", "").startsWith("2001:0:")) { //Filter Teredo
set.add(URI.create(svcfull.replace("[::]", ipFormat(h))));
}
}
} else {
//Just add as is
set.add(URI.create(svcfull));
}
} catch (Exception e) {
//Ignore
} finally {
setLock.unlock();
}
}
}
}
}
} catch (Exception e) {
//Ignore
}
}
} finally {
try {
socket.leaveGroup(ia);
} catch (Exception e) {
//Ignore
}
try {
socket.close();
} catch (Exception e) {
//Ignore
}
}
}
}));
}
try {
//Give listener threads a reasonable amount of time to start
if (latchListeners.await(clientSocketsFinal.length * 2, TimeUnit.SECONDS)) {
//Start pulsing client request every 10ms - This will ensure we have at least 4 client pulses within our minimum timeout
//This pulse is designed to tell a listening server to wake up and pulse back a response
futures.add(0, getExecutorService().submit(new Runnable() {
@Override
public void run() {
while (running.get()) {
//Pulse to listening servers - It is thread safe to use same sockets as send/receive synchronization is only on the packet
for (final MulticastSocket socket : clientSocketsFinal) {
if (running.get()) {
try {
socket.send(request);
} catch (Exception e) {
//Ignore
}
} else {
break;
}
}
if (running.get()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
break;
}
}
}
}
}));
} else {
timeout = 1;
}
} catch (InterruptedException e) {
//Terminate as quickly as possible
timeout = 1;
}
//Kill the threads after timeout
timer.schedule(new TimerTask() {
@Override
public void run() {
running.set(false);
try {
for (final Future future : futures) {
future.cancel(true);
}
} catch (ConcurrentModificationException e) {
//Ignore
}
}
}, timeout);
//Wait for threads to complete
for (final Future future : futures) {
try {
future.get();
} catch (Exception e) {
//Ignore
}
}
setLock.lock();
try {
return new TreeSet<URI>(set);
} finally {
setLock.unlock();
}
} finally {
//Just to be sure we are clean
for (final Future future : futures) {
try {
future.cancel(true);
} catch (Exception e) {
//Ignore
}
}
futures.clear();
for (final MulticastSocket socket : clientSockets) {
try {
socket.leaveGroup(ia);
} catch (Exception e) {
//Ignore
}
try {
socket.close();
} catch (Exception e) {
//Ignore
}
}
}
}