ResultCode enqueueEventsToSendToApmServer()

in agent/native/ext/backend_comm.cpp [1165:1229]


ResultCode enqueueEventsToSendToApmServer( StringView userAgentHttpHeader, StringView serializedEvents )
{
    char txtOutStreamBuf[ ELASTIC_APM_TEXT_OUTPUT_STREAM_ON_STACK_BUFFER_SIZE ];
    TextOutputStream txtOutStream = ELASTIC_APM_TEXT_OUTPUT_STREAM_FROM_STATIC_BUFFER( txtOutStreamBuf );

    ELASTIC_APM_LOG_DEBUG(
            "Queueing events to send asynchronously..."
            "; userAgentHttpHeader [length: %" PRIu64 "]: `%.*s'"
            "; serializedEvents [length: %" PRIu64 "]:\n%.*s"
            , (UInt64) userAgentHttpHeader.length, (int) userAgentHttpHeader.length, userAgentHttpHeader.begin
            , (UInt64) serializedEvents.length, (int) serializedEvents.length, serializedEvents.begin );
    textOutputStreamRewind( &txtOutStream );

    ResultCode resultCode;
    bool shouldUnlockMutex = false;
    UInt64 id;
    BackgroundBackendComm* backgroundBackendComm = g_backgroundBackendComm;

    ELASTIC_APM_CALL_IF_FAILED_GOTO( lockMutex( backgroundBackendComm->mutex, &shouldUnlockMutex, __FUNCTION__ ) );

    if ( backgroundBackendComm->dataToSendTotalSize >= ELASTIC_APM_MAX_QUEUE_SIZE_IN_BYTES )
    {
        ELASTIC_APM_LOG_ERROR(
                "Already queued events are above max queue size - dropping these events"
                "; size of already queued events: %" PRIu64 
                , (UInt64) backgroundBackendComm->dataToSendTotalSize );
        ELASTIC_APM_SET_RESULT_CODE_AND_GOTO_FAILURE();
    }

    id = backgroundBackendComm->nextEventsBatchId;
    ELASTIC_APM_CALL_IF_FAILED_GOTO(
            addCopyToDataToSendQueue( &( backgroundBackendComm->dataToSendQueue )
                                      , id
                                      , userAgentHttpHeader
                                      , serializedEvents ) );

    backgroundBackendComm->dataToSendTotalSize += serializedEvents.length;
    ++backgroundBackendComm->nextEventsBatchId;

    ELASTIC_APM_LOG_DEBUG(
            "Queued a batch of events"
            "; batch ID: %" PRIu64 
            "; batch size: %" PRIu64 
            "; total size of queued events: %" PRIu64 
            , (UInt64) id
            , (UInt64) serializedEvents.length
            , (UInt64) backgroundBackendComm->dataToSendTotalSize );

    ELASTIC_APM_CALL_IF_FAILED_GOTO( signalConditionVariable( backgroundBackendComm->condVar, __FUNCTION__ ) );

    resultCode = resultSuccess;

    finally:
    unlockMutex( backgroundBackendComm->mutex, &shouldUnlockMutex, __FUNCTION__ );

    ELASTIC_APM_LOG_DEBUG_RESULT_CODE_FUNCTION_EXIT_MSG(
            "Finished queueing events to send asynchronously"
            "; serializedEvents [length: %" PRIu64 "]:\n%.*s"
            , (UInt64) serializedEvents.length, (int) serializedEvents.length, serializedEvents.begin );

    return resultCode;

    failure:
    goto finally;
}