in yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPConnectionThreaded.java [515:709]
public boolean send(Downcall down, boolean block) {
Assert._OB_assert(transport_.mode() != org.apache.yoko.orb.OCI.SendReceiveMode.ReceiveOnly);
Assert._OB_assert(down.unsent() == true);
logger.fine("Sending a request with Downcall of type " + down.getClass().getName() + " for operation " + down.operation() + " on transport " + transport_);
//
// if we send off a message in the loop, this var might help us
// to prevent a further locking to check the status
//
boolean msgSentMarked = false;
//
// if we don't have writing turned on then we must throw a
// TRANSIENT to the caller indicating this
//
synchronized (this) {
if ((enabledOps_ & AccessOp.Write) == 0) {
logger.fine("writing not enabled for this connection");
down.setFailureException(new org.omg.CORBA.TRANSIENT());
return true;
}
//
// make the downcall thread-safe
//
if (down.responseExpected()) {
down.allowWaiting();
}
//
// buffer the request
//
messageQueue_.add(orbInstance_, down);
//
// check the sent status while we're locked
//
if ((properties_ & Property.RequestSent) != 0) {
msgSentMarked = true;
}
}
//
// now prepare to send it either blocking or non-blocking
// depending on the call mode param
//
if (block) {
//
// Get the request timeout
//
int t = down.policies().requestTimeout;
int msgcount = 0;
//
// now we can start sending off the messages
//
while (true) {
//
// Get a message to send from the unsent queue
//
org.apache.yoko.orb.OCI.Buffer buf;
Downcall nextDown;
synchronized (this) {
if (!down.unsent()) {
break;
}
Assert._OB_assert(messageQueue_.hasUnsent());
buf = messageQueue_.getFirstUnsentBuffer();
nextDown = messageQueue_.moveFirstUnsentToPending();
}
//
// Send the message
//
try {
synchronized (sendMutex_) {
if (t <= 0) {
//
// Send buffer, blocking
//
transport_.send(buf, true);
Assert._OB_assert(buf.is_full());
} else {
//
// Send buffer, with timeout
//
transport_.send_timeout(buf, t);
//
// Timeout?
//
if (!buf.is_full()) {
throw new org.omg.CORBA.NO_RESPONSE();
}
}
}
} catch (org.omg.CORBA.SystemException ex) {
processException(State.Closed, ex, false);
return true;
}
//
// a message should be sent by now so we have to
// mark it as sent for the GIOPClient
//
if (!msgSentMarked && (nextDown != null)
&& !nextDown.operation().equals("_locate")) {
msgSentMarked = true;
properties_ |= Property.RequestSent;
// debug
if (logger.isLoggable(Level.FINE)) {
int currentpos = buf.pos_;
buf.pos_ = 0;
logger.fine("Sent message in blocking at msgcount="
+ msgcount + ", size=" + buf.len_
+ ", the message piece is: \n" + buf.dumpData());
buf.pos_ = currentpos;
msgcount++;
}
}
}
} else // Non blocking
{
synchronized (this) {
int msgcount = 0;
while (true) {
if (!down.unsent())
break;
Assert._OB_assert(messageQueue_.hasUnsent());
//
// get the first message to send
//
org.apache.yoko.orb.OCI.Buffer buf = messageQueue_
.getFirstUnsentBuffer();
//
// send this buffer, non-blocking
//
try {
synchronized (sendMutex_) {
transport_.send(buf, false);
}
} catch (org.omg.CORBA.SystemException ex) {
processException(State.Closed, ex, false);
return true;
}
//
// if the buffer isn't full, it hasn't been sent because
// the call would have blocked.
//
if (!buf.is_full())
return false;
//
// now move to the pending pile
//
Downcall dummy = messageQueue_.moveFirstUnsentToPending();
//
// update the message sent property
//
if (!msgSentMarked && dummy != null) {
if (dummy.responseExpected()
&& dummy.operation().equals("_locate")) {
msgSentMarked = true;
properties_ |= Property.RequestSent;
// debug
if (logger.isLoggable(Level.FINE)) {
int currentpos = buf.pos_;
buf.pos_ = 0;
logger.fine("Sent message in non-blocking at msgcount="
+ msgcount
+ ", size="
+ buf.len_
+ ", the message piece is: \n"
+ buf.dumpData());
buf.pos_ = currentpos;
msgcount++;
}
}
}
}
}
}
logger.fine(" Request send completed with Downcall of type " + down.getClass().getName());
return !down.responseExpected();
}