in yoko-core/src/main/java/org/apache/yoko/orb/OB/OrbAsyncHandler.java [88:256]
public void run() {
while (true) {
AsyncMessage msg = null;
synchronized (handler_.sendMonitor_) {
//
// check for a shutdown first
//
synchronized (this) {
if (shutdown_ == true)
break;
}
//
// if there are no messages to send then we want to
// wait until there is
//
try {
if (handler_.uncompletedMsgList_.size() == 0) {
handler_.sendMonitor_.wait();
}
} catch (InterruptedException ex) {
//
// this is not really an issue
//
}
//
// check again if there is a message because the
// wait could have been stopped by an interruption
// or a shutdown call
//
if (handler_.uncompletedMsgList_.size() > 0) {
msg = (AsyncMessage) handler_.uncompletedMsgList_
.removeFirst();
}
}
//
// send/receive the request if there is one
//
if (msg != null) {
//
// Check to see if this is a new, unsent message or if
// this is a message that was delayed by a replyStart/
// replyEnd time policy
//
if (msg.downcall.unsent()) {
//
// if we have a RequestStartTime policy set, put the
// message back into the uncompletedMsgList
//
org.omg.TimeBase.UtcT requestStartTime = msg.downcall
.policies().requestStartTime;
if (TimeHelper.notEqual(requestStartTime, TimeHelper
.utcMin())
&& TimeHelper.greaterThan(requestStartTime,
TimeHelper.utcNow(0))) {
synchronized (handler_.sendMonitor_) {
handler_.uncompletedMsgList_.addLast(msg);
handler_.sendMonitor_.notifyAll();
}
continue;
}
//
// if we have a RequestEndTime policy set, then we
// should discard the message since it is no longer
// valid
//
org.omg.TimeBase.UtcT requestEndTime = msg.downcall
.policies().requestEndTime;
if (TimeHelper.notEqual(requestEndTime, TimeHelper
.utcMin())
&& TimeHelper.lessThan(requestEndTime,
TimeHelper.utcNow(0))) {
continue;
}
try {
msg.downcall.request();
} catch (org.apache.yoko.orb.OB.LocationForward ex) {
//
// TODO: A REBIND can also be thrown if the policy
// has a value of NO_REBIND and returned IORs
// policy requirements are incompatible with
// effective policies currently in use.
//
if (msg.downcall.policies().rebindMode == org.omg.Messaging.NO_RECONNECT.value) {
msg.downcall
.setSystemException(new org.omg.CORBA.REBIND());
}
} catch (org.apache.yoko.orb.OB.FailureException ex) {
//
// handle failure exception
//
continue;
}
}
//
// check for a ReplyStartTime policy. If it has not
// come into effect yet, add this message to the
// delayedMsgList
//
org.omg.TimeBase.UtcT replyStartTime = msg.downcall
.policies().replyStartTime;
if (TimeHelper
.notEqual(replyStartTime, TimeHelper.utcMin())
&& TimeHelper.greaterThan(replyStartTime,
TimeHelper.utcNow(0))) {
synchronized (handler_.sendMonitor_) {
handler_.uncompletedMsgList_.addLast(msg);
handler_.sendMonitor_.notifyAll();
}
continue;
}
//
// check to see if the ReplyEndTime policy prevents us
// from delivering the reply
//
org.omg.TimeBase.UtcT replyEndTime = msg.downcall
.policies().replyEndTime;
if (TimeHelper.notEqual(replyEndTime, TimeHelper.utcMin())
&& TimeHelper.lessThan(replyEndTime, TimeHelper
.utcNow(0))) {
continue;
}
//
// if there is a reply handler to invoke, do it now
//
if (msg.reply != null) {
org.apache.yoko.orb.OBMessaging.ReplyHandler_impl reply = (org.apache.yoko.orb.OBMessaging.ReplyHandler_impl) msg.reply;
reply._OB_invoke(msg.downcall);
continue;
}
//
// so there was no reply handler which means there
// MUST be a poller
//
Assert._OB_assert(msg.poller != null);
//
// check the poller for its reply handler
//
org.omg.Messaging.ReplyHandler msgReply = msg.poller
.associated_handler();
if (msgReply != null) {
org.apache.yoko.orb.OBMessaging.ReplyHandler_impl reply = (org.apache.yoko.orb.OBMessaging.ReplyHandler_impl) msgReply;
reply._OB_invoke(msg.downcall);
continue;
}
//
// there was no reply handler to handle the message
// so we can put it into the completed list now and
// notify any clients waiting on this list
//
synchronized (handler_.recvMonitor_) {
handler_.completedMsgList_.addLast(msg);
handler_.recvMonitor_.notifyAll();
}
}
}
}