public void invoke()

in broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java [536:684]


    public void invoke(Method m, Runnable postIdSettingAction)
    {
        if (m.getEncodedTrack() == Frame.L4)
        {
            synchronized (commandsLock)
            {
                if (state == DETACHED && m.isUnreliable())
                {
                    Thread current = Thread.currentThread();
                    if (!current.equals(resumer))
                    {
                        return;
                    }
                }

                if (state != OPEN && state != CLOSED && state != CLOSING)
                {
                    Thread current = Thread.currentThread();
                    if (!current.equals(resumer) )
                    {
                        // Should not happen
                        throw new SessionException(String.format("Unexpected state %s", state));
                    }
                }

                switch (state)
                {
                case OPEN:
                    break;
                case RESUMING:
                    Thread current = Thread.currentThread();
                    if (!current.equals(resumer))
                    {
                        throw new SessionException
                            ("timed out waiting for resume to finish");
                    }
                    break;
                case CLOSING:
                case CLOSED:
                    org.apache.qpid.server.protocol.v0_10.transport.ExecutionException exc = getException();
                    if (exc != null)
                    {
                        throw new SessionException(exc);
                    }
                    else
                    {
                        throw new SessionClosedException();
                    }
                default:
                    throw new SessionException
                        (String.format
                         ("timed out waiting for session to become open " +
                          "(state=%s)", state));
                }

                int next;
                next = commandsOut++;
                m.setId(next);
                if(postIdSettingAction != null)
                {
                    postIdSettingAction.run();
                }

                if (isFull(next))
                {
                    // Should not happen
                    throw new SessionException(String.format("Command buffer full next: %d", next));
                }

                if (state == CLOSED)
                {
                    org.apache.qpid.server.protocol.v0_10.transport.ExecutionException exc = getException();
                    if (exc != null)
                    {
                        throw new SessionException(exc);
                    }
                    else
                    {
                        throw new SessionClosedException();
                    }
                }

                if (isFull(next))
                {
                    throw new SessionException("timed out waiting for completion");
                }

                if (next == 0)
                {
                    sessionCommandPoint(0, 0);
                }

                boolean replayTransfer = !_isNoReplay && !closing && !transacted &&
                                         m instanceof MessageTransfer &&
                                         ! m.isUnreliable();

                if ((replayTransfer) || m.hasCompletionListener())
                {
                    setCommand(next, m);
                    commandBytes += m.getBodySize();
                }

                try
                {
                    send(m);
                }
                catch (SenderException e)
                {
                    if (!closing)
                    {
                        // if we are not closing then this will happen
                        // again on resume
                        LOGGER.error("error sending command", e);
                    }
                    else
                    {
                        e.rethrow();
                    }
                }

                // flush every 64K commands to avoid ambiguity on
                // wraparound
                if (shouldIssueFlush(next))
                {
                    try
                    {
                        sessionFlush(COMPLETED);
                    }
                    catch (SenderException e)
                    {
                        if (!closing)
                        {
                            // if expiry is > 0 then this will happen
                            // again on resume
                            LOGGER.error("error sending flush (periodic)", e);
                        }
                        else
                        {
                            e.rethrow();
                        }
                    }
                }
            }
        }
        else
        {
            send(m);
        }
    }