bool DirectTCPConnection::writeData()

in aios/network/anet/directtcpconnection.cpp [62:175]


bool DirectTCPConnection::writeData() {
    ANET_LOG(DEBUG, "DirectTCPConnection writeData");
    // to reduce the odds of blocking postPacket()
    _outputCond.lock();
    _outputQueue.moveTo(&_myQueue);
    if (_myQueue.size() == 0 && _output.getDataLen() == 0 && _payloadLeftToWrite == 0UL) {
        ANET_LOG(DEBUG, "IOC(%p)->enableWrite(false)", _iocomponent);
        _iocomponent->enableWrite(false);
        _outputCond.unlock();
        return true;
    }
    _outputCond.unlock();
    int ret = 0;
    int writeCnt = 0;
    int myQueueSize = _myQueue.size();
    int error = 0;
    _lasttime = TimeUtil::getTime();
    do {
        while (_output.getDataLen() < DIRECT_READ_WRITE_SIZE) {
            int64_t oldDataLen = _output.getDataLen();
            int64_t oldSpaceAllocated = _output.getSpaceUsed();
            if (_payloadLeftToWrite > 0UL) {
                assert(nullptr != _writingPacket);
                goto CONTINUE_WRITE_PAYLOAD;
            }
            if (myQueueSize == 0) {
                break;
            }
            // _writingPacket must be recycled on error
            _writingPacket = static_cast<anet::DirectPacket *>(_myQueue.pop());
            myQueueSize--;
            _directStreamer->encode(_writingPacket, &_output, _writingPayload);
            if (_writingPayload.getAddr() != nullptr && _writingPayload.getLen() > 0UL) {
                assert(_payloadLeftToWrite == 0UL);
                _payloadLeftToWrite = _writingPayload.getLen();
            }
            {
                int64_t newDataLen = _output.getDataLen();
                int64_t newSpaceAllocated = _output.getSpaceUsed();
                int64_t packetSizeInBuffer = newDataLen - oldDataLen;
                if (packetSizeInBuffer > _maxSendPacketSize) {
                    _maxSendPacketSize = packetSizeInBuffer;
                }
                addOutputBufferSpaceAllocated(newSpaceAllocated - oldSpaceAllocated);
                ANET_ADD_OUTPUT_BUFFER_SPACE_USED(packetSizeInBuffer);
                Channel *channel = _writingPacket->getChannel();
                if (channel) {
                    if (_defaultPacketHandler == NULL && channel->getHandler() == NULL) {
                        // free channel of packets not expecting reply
                        _channelPool.freeChannel(channel);
                    } else {
                        ANET_LOG(SPAM, "channel[%p] expire time[%ld]", channel, _writingPacket->getExpireTime());
                        channel->setExpireTime(_writingPacket->getExpireTime());
                        _channelPool.addToWaitingList(channel);
                    }
                }
            }
        CONTINUE_WRITE_PAYLOAD:
            if (_payloadLeftToWrite > 0) { // write direct payload:
                if (_output.getDataLen() > 0) {
                    ret = sendBuffer(writeCnt, error);
                    if (_output.getDataLen() > 0) {
                        break;
                    }
                }
                ret = sendPayload(writeCnt, error);
                if (0UL == _payloadLeftToWrite) { // write to socket success with payload
                    finishPacketWrite();
                }
                break;
            } else { // write to buffer success without payload
                finishPacketWrite();
            }
        }
        if (_output.getDataLen() > 0) {
            ret = sendBuffer(writeCnt, error);
        }
    } while (ret > 0 && _output.getDataLen() == 0 && _payloadLeftToWrite == 0 && myQueueSize > 0 && writeCnt < 10);
    _outputCond.lock();
    _outputQueue.moveBack(&_myQueue);
    if (error != 0 && error != EWOULDBLOCK && error != EAGAIN) {
        char spec[32];
        ANET_LOG(WARN, "Connection (%s) write error: %d", _socket->getAddr(spec, 32), error);
        _outputCond.unlock();
        clearOutputBuffer();
        clearWritingPacket(); // clear packet on error
        return false;
    }
    int queueSize = _outputQueue.size() + (_output.getDataLen() > 0 ? 1 : 0) + (_payloadLeftToWrite > 0 ? 1 : 0);
    if (queueSize > 0) {
        // when using level triggered mode, do NOT need to call enableWrite() any more.
    } else if (_writeFinishShutdown) {
        _outputCond.unlock();
        _iocomponent->enableWrite(false);
        _iocomponent->shutdownSocket();
        return true;
    } else if (_writeFinishClose) {
        ANET_LOG(DEBUG, "Initiative cut connect.");
        _outputCond.unlock();
        clearOutputBuffer();
        return false;
    } else {
        ANET_LOG(DEBUG, "IOC(%p)->enableWrite(false)", _iocomponent);
        _iocomponent->enableWrite(false);
    }
    if (!_isServer && _queueLimit > 0) {
        size_t queueTotalSize = _channelPool.getUseListCount();
        if (queueTotalSize < _queueLimit && _waitingThreads > 0) {
            _outputCond.broadcast();
        }
    }
    _outputCond.unlock();
    return true;
}