in broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java [536:684]
public void invoke(Method m, Runnable postIdSettingAction)
{
if (m.getEncodedTrack() == Frame.L4)
{
synchronized (commandsLock)
{
if (state == DETACHED && m.isUnreliable())
{
Thread current = Thread.currentThread();
if (!current.equals(resumer))
{
return;
}
}
if (state != OPEN && state != CLOSED && state != CLOSING)
{
Thread current = Thread.currentThread();
if (!current.equals(resumer) )
{
// Should not happen
throw new SessionException(String.format("Unexpected state %s", state));
}
}
switch (state)
{
case OPEN:
break;
case RESUMING:
Thread current = Thread.currentThread();
if (!current.equals(resumer))
{
throw new SessionException
("timed out waiting for resume to finish");
}
break;
case CLOSING:
case CLOSED:
org.apache.qpid.server.protocol.v0_10.transport.ExecutionException exc = getException();
if (exc != null)
{
throw new SessionException(exc);
}
else
{
throw new SessionClosedException();
}
default:
throw new SessionException
(String.format
("timed out waiting for session to become open " +
"(state=%s)", state));
}
int next;
next = commandsOut++;
m.setId(next);
if(postIdSettingAction != null)
{
postIdSettingAction.run();
}
if (isFull(next))
{
// Should not happen
throw new SessionException(String.format("Command buffer full next: %d", next));
}
if (state == CLOSED)
{
org.apache.qpid.server.protocol.v0_10.transport.ExecutionException exc = getException();
if (exc != null)
{
throw new SessionException(exc);
}
else
{
throw new SessionClosedException();
}
}
if (isFull(next))
{
throw new SessionException("timed out waiting for completion");
}
if (next == 0)
{
sessionCommandPoint(0, 0);
}
boolean replayTransfer = !_isNoReplay && !closing && !transacted &&
m instanceof MessageTransfer &&
! m.isUnreliable();
if ((replayTransfer) || m.hasCompletionListener())
{
setCommand(next, m);
commandBytes += m.getBodySize();
}
try
{
send(m);
}
catch (SenderException e)
{
if (!closing)
{
// if we are not closing then this will happen
// again on resume
LOGGER.error("error sending command", e);
}
else
{
e.rethrow();
}
}
// flush every 64K commands to avoid ambiguity on
// wraparound
if (shouldIssueFlush(next))
{
try
{
sessionFlush(COMPLETED);
}
catch (SenderException e)
{
if (!closing)
{
// if expiry is > 0 then this will happen
// again on resume
LOGGER.error("error sending flush (periodic)", e);
}
else
{
e.rethrow();
}
}
}
}
}
else
{
send(m);
}
}