in commons-jcs3-core/src/main/java/org/apache/commons/jcs3/utils/discovery/UDPDiscoveryReceiver.java [218:307]
public void run()
{
try
{
log.debug( "Waiting for message." );
while (!shutdown.get())
{
int activeKeys = selector.select();
if (activeKeys == 0)
{
continue;
}
for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();)
{
if (shutdown.get())
{
break;
}
SelectionKey key = i.next();
if (!key.isValid())
{
continue;
}
if (key.isReadable())
{
cnt.incrementAndGet();
log.debug( "{0} messages received.", this::getCnt );
DatagramChannel mc = (DatagramChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(65536);
InetSocketAddress sourceAddress =
(InetSocketAddress) mc.receive(byteBuffer);
byteBuffer.flip();
try
{
log.debug("Received packet from address [{0}]", sourceAddress);
byte[] bytes = new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
Object obj = serializer.deSerialize(bytes, null);
if (obj instanceof UDPDiscoveryMessage)
{
// Ensure that the address we're supposed to send to is, indeed, the address
// of the machine on the other end of this connection. This guards against
// instances where we don't exactly get the right local host address
final UDPDiscoveryMessage msg = (UDPDiscoveryMessage) obj;
msg.setHost(sourceAddress.getHostString());
log.debug( "Read object from address [{0}], object=[{1}]",
sourceAddress, obj );
// Just to keep the functionality of the deprecated waitForMessage method
synchronized (msgQueue)
{
// Check if queue full already?
if (msgQueue.remainingCapacity() == 0)
{
// remove oldest element from queue
msgQueue.remove();
}
msgQueue.add(msg);
}
pooledExecutor.execute(() -> handleMessage(msg));
log.debug( "Passed handler to executor." );
}
}
catch ( final IOException | ClassNotFoundException e )
{
log.error( "Error receiving multicast packet", e );
}
i.remove();
}
}
} // end while
}
catch ( final IOException e )
{
log.error( "Unexpected exception in UDP receiver.", e );
}
}