boolean doRun()

in src/java/org/apache/cassandra/net/OutboundConnection.java [765:921]


        boolean doRun(Established established)
        {
            if (!isWritable)
                return false;

            // pendingBytes is updated before queue.size() (which triggers notEmpty, and begins delivery),
            // so it is safe to use it here to exit delivery
            // this number is inaccurate for old versions, but we don't mind terribly - we'll send at least one message,
            // and get round to it eventually (though we could add a fudge factor for some room for older versions)
            int maxSendBytes = (int) min(pendingBytes() - flushingBytes, LARGE_MESSAGE_THRESHOLD);
            if (maxSendBytes == 0)
                return false;

            OutboundConnectionSettings settings = established.settings;
            int messagingVersion = established.messagingVersion;

            FrameEncoder.Payload sending = null;
            int canonicalSize = 0; // number of bytes we must use for our resource accounting
            int sendingBytes = 0;
            int sendingCount = 0;
            try (OutboundMessageQueue.WithLock withLock = queue.lockOrCallback(approxTime.now(), this::execute))
            {
                if (withLock == null)
                    return false; // we failed to acquire the queue lock, so return; we will be scheduled again when the lock is available

                sending = established.payloadAllocator.allocate(true, maxSendBytes);
                DataOutputBufferFixed out = new DataOutputBufferFixed(sending.buffer);

                Message<?> next;
                while ( null != (next = withLock.peek()) )
                {
                    try
                    {
                        int messageSize = next.serializedSize(messagingVersion);

                        // actual message size for this version is larger than permitted maximum
                        if (messageSize > DatabaseDescriptor.getInternodeMaxMessageSizeInBytes())
                            throw new Message.OversizedMessageException(messageSize);

                        if (messageSize > sending.remaining())
                        {
                            // if we don't have enough room to serialize the next message, we have either
                            //  1) run out of room after writing some messages successfully; this might mean that we are
                            //     overflowing our highWaterMark, or that we have just filled our buffer
                            //  2) we have a message that is too large for this connection; this can happen if a message's
                            //     size was calculated for the wrong messaging version when enqueued.
                            //     In this case we want to write it anyway, so simply allocate a large enough buffer.

                            if (sendingBytes > 0)
                                break;

                            sending.release();
                            sending = null; // set to null to prevent double-release if we fail to allocate our new buffer
                            sending = established.payloadAllocator.allocate(true, messageSize);
                            //noinspection IOResourceOpenedButNotSafelyClosed
                            out = new DataOutputBufferFixed(sending.buffer);
                        }

                        Tracing.instance.traceOutgoingMessage(next, messageSize, settings.connectTo);
                        Message.serializer.serialize(next, out, messagingVersion);

                        if (sending.length() != sendingBytes + messageSize)
                            throw new InvalidSerializedSizeException(next.verb(), messageSize, sending.length() - sendingBytes);

                        canonicalSize += canonicalSize(next);
                        sendingCount += 1;
                        sendingBytes += messageSize;
                    }
                    catch (Throwable t)
                    {
                        onFailedSerialize(next, messagingVersion, 0, t);

                        assert sending != null;
                        // reset the buffer to ignore the message we failed to serialize
                        sending.trim(sendingBytes);
                    }
                    withLock.removeHead(next);
                }
                if (0 == sendingBytes)
                    return false;

                sending.finish();
                debug.onSendSmallFrame(sendingCount, sendingBytes);
                ChannelFuture flushResult = AsyncChannelPromise.writeAndFlush(established.channel, sending);
                sending = null;

                if (flushResult.isSuccess())
                {
                    sentCount += sendingCount;
                    sentBytes += sendingBytes;
                    debug.onSentSmallFrame(sendingCount, sendingBytes);
                }
                else
                {
                    flushingBytes += canonicalSize;
                    setInProgress(true);

                    boolean hasOverflowed = flushingBytes >= settings.flushHighWaterMark;
                    if (hasOverflowed)
                    {
                        isWritable = false;
                        promiseToExecuteLater();
                    }

                    int releaseBytesFinal = canonicalSize;
                    int sendingBytesFinal = sendingBytes;
                    int sendingCountFinal = sendingCount;
                    flushResult.addListener(future -> {

                        releaseCapacity(sendingCountFinal, releaseBytesFinal);
                        flushingBytes -= releaseBytesFinal;
                        if (flushingBytes == 0)
                            setInProgress(false);

                        if (!isWritable && flushingBytes <= settings.flushLowWaterMark)
                        {
                            isWritable = true;
                            executeAgain();
                        }

                        if (future.isSuccess())
                        {
                            sentCount += sendingCountFinal;
                            sentBytes += sendingBytesFinal;
                            debug.onSentSmallFrame(sendingCountFinal, sendingBytesFinal);
                        }
                        else
                        {
                            errorCount += sendingCountFinal;
                            errorBytes += sendingBytesFinal;
                            invalidateChannel(established, future.cause());
                            debug.onFailedSmallFrame(sendingCountFinal, sendingBytesFinal);
                        }
                    });
                    canonicalSize = 0;
                }
            }
            catch (Throwable t)
            {
                errorCount += sendingCount;
                errorBytes += sendingBytes;
                invalidateChannel(established, t);
            }
            finally
            {
                if (canonicalSize > 0)
                    releaseCapacity(sendingCount, canonicalSize);

                if (sending != null)
                    sending.release();

                if (pendingBytes() > flushingBytes && isWritable)
                    execute();
            }

            return false;
        }