in aios/network/anet/directtcpconnection.cpp [62:175]
bool DirectTCPConnection::writeData() {
ANET_LOG(DEBUG, "DirectTCPConnection writeData");
// to reduce the odds of blocking postPacket()
_outputCond.lock();
_outputQueue.moveTo(&_myQueue);
if (_myQueue.size() == 0 && _output.getDataLen() == 0 && _payloadLeftToWrite == 0UL) {
ANET_LOG(DEBUG, "IOC(%p)->enableWrite(false)", _iocomponent);
_iocomponent->enableWrite(false);
_outputCond.unlock();
return true;
}
_outputCond.unlock();
int ret = 0;
int writeCnt = 0;
int myQueueSize = _myQueue.size();
int error = 0;
_lasttime = TimeUtil::getTime();
do {
while (_output.getDataLen() < DIRECT_READ_WRITE_SIZE) {
int64_t oldDataLen = _output.getDataLen();
int64_t oldSpaceAllocated = _output.getSpaceUsed();
if (_payloadLeftToWrite > 0UL) {
assert(nullptr != _writingPacket);
goto CONTINUE_WRITE_PAYLOAD;
}
if (myQueueSize == 0) {
break;
}
// _writingPacket must be recycled on error
_writingPacket = static_cast<anet::DirectPacket *>(_myQueue.pop());
myQueueSize--;
_directStreamer->encode(_writingPacket, &_output, _writingPayload);
if (_writingPayload.getAddr() != nullptr && _writingPayload.getLen() > 0UL) {
assert(_payloadLeftToWrite == 0UL);
_payloadLeftToWrite = _writingPayload.getLen();
}
{
int64_t newDataLen = _output.getDataLen();
int64_t newSpaceAllocated = _output.getSpaceUsed();
int64_t packetSizeInBuffer = newDataLen - oldDataLen;
if (packetSizeInBuffer > _maxSendPacketSize) {
_maxSendPacketSize = packetSizeInBuffer;
}
addOutputBufferSpaceAllocated(newSpaceAllocated - oldSpaceAllocated);
ANET_ADD_OUTPUT_BUFFER_SPACE_USED(packetSizeInBuffer);
Channel *channel = _writingPacket->getChannel();
if (channel) {
if (_defaultPacketHandler == NULL && channel->getHandler() == NULL) {
// free channel of packets not expecting reply
_channelPool.freeChannel(channel);
} else {
ANET_LOG(SPAM, "channel[%p] expire time[%ld]", channel, _writingPacket->getExpireTime());
channel->setExpireTime(_writingPacket->getExpireTime());
_channelPool.addToWaitingList(channel);
}
}
}
CONTINUE_WRITE_PAYLOAD:
if (_payloadLeftToWrite > 0) { // write direct payload:
if (_output.getDataLen() > 0) {
ret = sendBuffer(writeCnt, error);
if (_output.getDataLen() > 0) {
break;
}
}
ret = sendPayload(writeCnt, error);
if (0UL == _payloadLeftToWrite) { // write to socket success with payload
finishPacketWrite();
}
break;
} else { // write to buffer success without payload
finishPacketWrite();
}
}
if (_output.getDataLen() > 0) {
ret = sendBuffer(writeCnt, error);
}
} while (ret > 0 && _output.getDataLen() == 0 && _payloadLeftToWrite == 0 && myQueueSize > 0 && writeCnt < 10);
_outputCond.lock();
_outputQueue.moveBack(&_myQueue);
if (error != 0 && error != EWOULDBLOCK && error != EAGAIN) {
char spec[32];
ANET_LOG(WARN, "Connection (%s) write error: %d", _socket->getAddr(spec, 32), error);
_outputCond.unlock();
clearOutputBuffer();
clearWritingPacket(); // clear packet on error
return false;
}
int queueSize = _outputQueue.size() + (_output.getDataLen() > 0 ? 1 : 0) + (_payloadLeftToWrite > 0 ? 1 : 0);
if (queueSize > 0) {
// when using level triggered mode, do NOT need to call enableWrite() any more.
} else if (_writeFinishShutdown) {
_outputCond.unlock();
_iocomponent->enableWrite(false);
_iocomponent->shutdownSocket();
return true;
} else if (_writeFinishClose) {
ANET_LOG(DEBUG, "Initiative cut connect.");
_outputCond.unlock();
clearOutputBuffer();
return false;
} else {
ANET_LOG(DEBUG, "IOC(%p)->enableWrite(false)", _iocomponent);
_iocomponent->enableWrite(false);
}
if (!_isServer && _queueLimit > 0) {
size_t queueTotalSize = _channelPool.getUseListCount();
if (queueTotalSize < _queueLimit && _waitingThreads > 0) {
_outputCond.broadcast();
}
}
_outputCond.unlock();
return true;
}