in src/java/org/apache/cassandra/net/OutboundConnection.java [765:921]
boolean doRun(Established established)
{
if (!isWritable)
return false;
// pendingBytes is updated before queue.size() (which triggers notEmpty, and begins delivery),
// so it is safe to use it here to exit delivery
// this number is inaccurate for old versions, but we don't mind terribly - we'll send at least one message,
// and get round to it eventually (though we could add a fudge factor for some room for older versions)
int maxSendBytes = (int) min(pendingBytes() - flushingBytes, LARGE_MESSAGE_THRESHOLD);
if (maxSendBytes == 0)
return false;
OutboundConnectionSettings settings = established.settings;
int messagingVersion = established.messagingVersion;
FrameEncoder.Payload sending = null;
int canonicalSize = 0; // number of bytes we must use for our resource accounting
int sendingBytes = 0;
int sendingCount = 0;
try (OutboundMessageQueue.WithLock withLock = queue.lockOrCallback(approxTime.now(), this::execute))
{
if (withLock == null)
return false; // we failed to acquire the queue lock, so return; we will be scheduled again when the lock is available
sending = established.payloadAllocator.allocate(true, maxSendBytes);
DataOutputBufferFixed out = new DataOutputBufferFixed(sending.buffer);
Message<?> next;
while ( null != (next = withLock.peek()) )
{
try
{
int messageSize = next.serializedSize(messagingVersion);
// actual message size for this version is larger than permitted maximum
if (messageSize > DatabaseDescriptor.getInternodeMaxMessageSizeInBytes())
throw new Message.OversizedMessageException(messageSize);
if (messageSize > sending.remaining())
{
// if we don't have enough room to serialize the next message, we have either
// 1) run out of room after writing some messages successfully; this might mean that we are
// overflowing our highWaterMark, or that we have just filled our buffer
// 2) we have a message that is too large for this connection; this can happen if a message's
// size was calculated for the wrong messaging version when enqueued.
// In this case we want to write it anyway, so simply allocate a large enough buffer.
if (sendingBytes > 0)
break;
sending.release();
sending = null; // set to null to prevent double-release if we fail to allocate our new buffer
sending = established.payloadAllocator.allocate(true, messageSize);
//noinspection IOResourceOpenedButNotSafelyClosed
out = new DataOutputBufferFixed(sending.buffer);
}
Tracing.instance.traceOutgoingMessage(next, messageSize, settings.connectTo);
Message.serializer.serialize(next, out, messagingVersion);
if (sending.length() != sendingBytes + messageSize)
throw new InvalidSerializedSizeException(next.verb(), messageSize, sending.length() - sendingBytes);
canonicalSize += canonicalSize(next);
sendingCount += 1;
sendingBytes += messageSize;
}
catch (Throwable t)
{
onFailedSerialize(next, messagingVersion, 0, t);
assert sending != null;
// reset the buffer to ignore the message we failed to serialize
sending.trim(sendingBytes);
}
withLock.removeHead(next);
}
if (0 == sendingBytes)
return false;
sending.finish();
debug.onSendSmallFrame(sendingCount, sendingBytes);
ChannelFuture flushResult = AsyncChannelPromise.writeAndFlush(established.channel, sending);
sending = null;
if (flushResult.isSuccess())
{
sentCount += sendingCount;
sentBytes += sendingBytes;
debug.onSentSmallFrame(sendingCount, sendingBytes);
}
else
{
flushingBytes += canonicalSize;
setInProgress(true);
boolean hasOverflowed = flushingBytes >= settings.flushHighWaterMark;
if (hasOverflowed)
{
isWritable = false;
promiseToExecuteLater();
}
int releaseBytesFinal = canonicalSize;
int sendingBytesFinal = sendingBytes;
int sendingCountFinal = sendingCount;
flushResult.addListener(future -> {
releaseCapacity(sendingCountFinal, releaseBytesFinal);
flushingBytes -= releaseBytesFinal;
if (flushingBytes == 0)
setInProgress(false);
if (!isWritable && flushingBytes <= settings.flushLowWaterMark)
{
isWritable = true;
executeAgain();
}
if (future.isSuccess())
{
sentCount += sendingCountFinal;
sentBytes += sendingBytesFinal;
debug.onSentSmallFrame(sendingCountFinal, sendingBytesFinal);
}
else
{
errorCount += sendingCountFinal;
errorBytes += sendingBytesFinal;
invalidateChannel(established, future.cause());
debug.onFailedSmallFrame(sendingCountFinal, sendingBytesFinal);
}
});
canonicalSize = 0;
}
}
catch (Throwable t)
{
errorCount += sendingCount;
errorBytes += sendingBytes;
invalidateChannel(established, t);
}
finally
{
if (canonicalSize > 0)
releaseCapacity(sendingCount, canonicalSize);
if (sending != null)
sending.release();
if (pendingBytes() > flushingBytes && isWritable)
execute();
}
return false;
}