in client/src/main/java/org/apache/qpid/transport/Session.java [616:806]
public void invoke(Method m, Runnable postIdSettingAction)
{
if (m.getEncodedTrack() == Frame.L4)
{
if (m.hasPayload())
{
acquireCredit();
}
synchronized (commandsLock)
{
if (state == DETACHED && m.isUnreliable())
{
Thread current = Thread.currentThread();
if (!current.equals(resumer))
{
return;
}
}
if (state != OPEN && state != CLOSED && state != CLOSING)
{
Thread current = Thread.currentThread();
if (!current.equals(resumer) )
{
Waiter w = new Waiter(commandsLock, timeout);
while (w.hasTime() && (state != OPEN && state != CLOSED))
{
checkFailoverRequired("Command was interrupted because of failover, before being sent");
w.await();
}
}
}
switch (state)
{
case OPEN:
break;
case RESUMING:
Thread current = Thread.currentThread();
if (!current.equals(resumer))
{
throw new SessionException
("timed out waiting for resume to finish");
}
break;
case CLOSING:
case CLOSED:
ExecutionException exc = getException();
if (exc != null)
{
throw new SessionException(exc);
}
else
{
throw new SessionClosedException();
}
default:
throw new SessionException
(String.format
("timed out waiting for session to become open " +
"(state=%s)", state));
}
int next;
next = commandsOut++;
m.setId(next);
if(postIdSettingAction != null)
{
postIdSettingAction.run();
}
if (isFull(next))
{
Waiter w = new Waiter(commandsLock, timeout);
while (w.hasTime() && isFull(next) && state != CLOSED)
{
if (state == OPEN || state == RESUMING)
{
try
{
sessionFlush(COMPLETED);
}
catch (SenderException e)
{
if (!closing)
{
// if expiry is > 0 then this will
// happen again on resume
LOGGER.error("error sending flush (full replay buffer)", e);
}
else
{
e.rethrow();
}
}
}
checkFailoverRequired("Command was interrupted because of failover, before being sent");
w.await();
}
}
if (state == CLOSED)
{
ExecutionException exc = getException();
if (exc != null)
{
throw new SessionException(exc);
}
else
{
throw new SessionClosedException();
}
}
if (isFull(next))
{
throw new SessionException("timed out waiting for completion");
}
if (next == 0)
{
sessionCommandPoint(0, 0);
}
boolean replayTransfer = !_isNoReplay && !closing && !transacted &&
m instanceof MessageTransfer &&
! m.isUnreliable();
if ((replayTransfer) || m.hasCompletionListener())
{
setCommand(next, m);
commandBytes += m.getBodySize();
}
if (autoSync)
{
m.setSync(true);
}
needSync = !m.isSync();
try
{
send(m);
}
catch (SenderException e)
{
if (!closing)
{
// if we are not closing then this will happen
// again on resume
LOGGER.error("error sending command", e);
}
else
{
e.rethrow();
}
}
if (autoSync)
{
sync();
}
// flush every 64K commands to avoid ambiguity on
// wraparound
if (shouldIssueFlush(next))
{
try
{
sessionFlush(COMPLETED);
}
catch (SenderException e)
{
if (!closing)
{
// if expiry is > 0 then this will happen
// again on resume
LOGGER.error("error sending flush (periodic)", e);
}
else
{
e.rethrow();
}
}
}
}
}
else
{
send(m);
}
}