agent/native/ext/backend_comm.cpp (1,003 lines of code) (raw):

/* * Licensed to Elasticsearch B.V. under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch B.V. licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "backend_comm.h" #include "elastic_apm_version.h" #if defined(PHP_WIN32) && ! defined(CURL_STATICLIB) # define CURL_STATICLIB #endif #include <curl/curl.h> #include "platform.h" #include "elastic_apm_alloc.h" #include "Tracer.h" #include "ConfigSnapshot.h" #include "util.h" #include "util_for_PHP.h" #include "basic_macros.h" #include "backend_comm_backoff.h" #include <string_view> #define ELASTIC_APM_CURRENT_LOG_CATEGORY ELASTIC_APM_LOG_CATEGORY_BACKEND_COMM struct LibCurlInfo { String version; String ssl_version; String libz_version; String host; const String* protocols; }; typedef struct LibCurlInfo LibCurlInfo; static LibCurlInfo g_cachedLibCurlInfo; static bool g_isCachedLibCurlInfoInited = false; void ensureCachedLibCurlInfoInited() { if ( g_isCachedLibCurlInfoInited ) { return; } curl_version_info_data* data = curl_version_info( CURLVERSION_NOW ); g_cachedLibCurlInfo.version = data->version; g_cachedLibCurlInfo.ssl_version = data->ssl_version; g_cachedLibCurlInfo.libz_version = data->libz_version; g_cachedLibCurlInfo.host = data->host; g_cachedLibCurlInfo.protocols = (const String*)( data->protocols ); g_isCachedLibCurlInfoInited = true; } String streamLibCurlInfo( TextOutputStream* txtOutStream ) { ensureCachedLibCurlInfoInited(); TextOutputStreamState txtOutStreamStateOnEntryStart; if ( ! textOutputStreamStartEntry( txtOutStream, &txtOutStreamStateOnEntryStart ) ) return ELASTIC_APM_TEXT_OUTPUT_STREAM_NOT_ENOUGH_SPACE_MARKER; streamPrintf( txtOutStream, "{" ); streamPrintf( txtOutStream, "version: %s", g_cachedLibCurlInfo.version ); streamPrintf( txtOutStream, ", ssl_version: %s", g_cachedLibCurlInfo.ssl_version ); streamPrintf( txtOutStream, ", libz_version: %s", g_cachedLibCurlInfo.libz_version ); streamPrintf( txtOutStream, ", host: %s", g_cachedLibCurlInfo.host ); /** * protocols is a pointer to an array of char * pointers, containing the names protocols that libcurl supports (using lowercase letters). * The protocol names are the same as would be used in URLs. The array is terminated by a NULL entry. * * @link https://curl.se/libcurl/c/curl_version_info.html */ streamPrintf( txtOutStream, ", protocols: [" ); for ( size_t index = 0 ; ; ++index ) { if ( g_cachedLibCurlInfo.protocols[ index ] == NULL ) { break; } if ( index != 0 ) { streamPrintf( txtOutStream, ", " ); } streamPrintf( txtOutStream, "%s", g_cachedLibCurlInfo.protocols[ index ] ); } streamPrintf( txtOutStream, "]" ); streamPrintf( txtOutStream, "}" ); return textOutputStreamEndEntry( &txtOutStreamStateOnEntryStart, txtOutStream ); } static ResultCode dupMallocStringView( StringView src, StringBuffer* dst ) { ELASTIC_APM_ASSERT_VALID_PTR( src.begin ); ELASTIC_APM_ASSERT_VALID_PTR( dst ); ELASTIC_APM_ASSERT_PTR_IS_NULL( dst->begin ); ELASTIC_APM_ASSERT( dst->size == 0, "" ); ResultCode resultCode; char* memBlockForDup = NULL; ELASTIC_APM_MALLOC_STRING_IF_FAILED_GOTO( /* length */ src.length, /* out */ memBlockForDup ); ELASTIC_APM_CALL_IF_FAILED_GOTO( safeStringCopy( src, /* dstBuf */ memBlockForDup, /* dstBufCapacity */ src.length + 1 ) ); dst->begin = memBlockForDup; memBlockForDup = NULL; dst->size = src.length + 1; resultCode = resultSuccess; finally: return resultCode; failure: ELASTIC_APM_FREE_STRING_AND_SET_TO_NULL( /* length */ src.length, /* out */ memBlockForDup ); goto finally; } static void freeMallocedStringBuffer( /* in,out */ StringBuffer* strBuf ) { ELASTIC_APM_ASSERT_VALID_PTR( strBuf ); if ( strBuf->begin != NULL ) { ELASTIC_APM_FREE_AND_SET_TO_NULL( char, strBuf->size, /* in,out */ strBuf->begin ); ELASTIC_APM_ZERO_STRUCT( strBuf ); } else { ELASTIC_APM_ASSERT( strBuf->size == 0, "" ); } } // Log response static size_t logResponse( void* data, size_t unusedSizeParam, size_t dataSize, void* unusedUserDataParam ) { // https://curl.haxx.se/libcurl/c/CURLOPT_WRITEFUNCTION.html // size (unusedSizeParam) is always 1 ELASTIC_APM_UNUSED( unusedSizeParam ); ELASTIC_APM_UNUSED( unusedUserDataParam ); ELASTIC_APM_LOG_DEBUG( "APM Server's response body [length: %" PRIu64 "]: %.*s", (UInt64) dataSize, (int) dataSize, (const char*) data ); return dataSize; } #define ELASTIC_APM_CURL_EASY_SETOPT( curlHandle, curlOptionId, ... ) \ do { \ CURLcode curl_easy_setopt_ret_val = curl_easy_setopt( curlHandle, curlOptionId, __VA_ARGS__ ); \ if ( curl_easy_setopt_ret_val != CURLE_OK ) \ { \ ELASTIC_APM_LOG_ERROR( "Failed to set cUrl option; curlOptionId: %d (used constant: %s); curl info: %s", curlOptionId, #curlOptionId, streamLibCurlInfo( &txtOutStream ) ); \ textOutputStreamRewind( &txtOutStream ); \ ELASTIC_APM_SET_RESULT_CODE_AND_GOTO_FAILURE_EX( resultCurlFailure ); \ } \ } while ( false ) \ /**/ ResultCode addToCurlStringList( /* in,out */ struct curl_slist** pList, const char* strToAdd ) { ELASTIC_APM_ASSERT_VALID_PTR( pList ); ELASTIC_APM_ASSERT_VALID_PTR( strToAdd ); struct curl_slist* newList = curl_slist_append( *pList, strToAdd ); if ( newList == NULL ) { char txtOutStreamBuf[ ELASTIC_APM_TEXT_OUTPUT_STREAM_ON_STACK_BUFFER_SIZE ]; TextOutputStream txtOutStream = ELASTIC_APM_TEXT_OUTPUT_STREAM_FROM_STATIC_BUFFER( txtOutStreamBuf ); ELASTIC_APM_LOG_ERROR( "Failed to curl_slist_append(); strToAdd: %s; curl info: %s", strToAdd, streamLibCurlInfo( &txtOutStream ) ); return resultCurlFailure; } *pList = newList; return resultSuccess; } struct ConnectionData { CURL* curlHandle; struct curl_slist* requestHeaders; BackendCommBackoff backoff; }; typedef struct ConnectionData ConnectionData; ConnectionData g_connectionData = { .curlHandle = NULL, .requestHeaders = NULL, .backoff = ELASTIC_APM_DEFAULT_BACKEND_COMM_BACKOFF }; void cleanupConnectionData( ConnectionData* connectionData ) { ELASTIC_APM_ASSERT_VALID_PTR( connectionData ); if ( connectionData->requestHeaders != NULL ) { curl_slist_free_all( connectionData->requestHeaders ); connectionData->requestHeaders = NULL; } if ( connectionData->curlHandle != NULL ) { curl_easy_cleanup( connectionData->curlHandle ); connectionData->curlHandle = NULL; } } String streamCurlInfoType( curl_infotype value, TextOutputStream* txtOutStream ) { switch ( value ) { #define ELASTIC_APM_CURL_INFO_SWITCH_CASE( enumItem ) case enumItem: return ELASTIC_APM_PP_STRINGIZE( enumItem ) ELASTIC_APM_CURL_INFO_SWITCH_CASE( CURLINFO_TEXT ); ELASTIC_APM_CURL_INFO_SWITCH_CASE( CURLINFO_HEADER_IN ); ELASTIC_APM_CURL_INFO_SWITCH_CASE( CURLINFO_HEADER_OUT ); ELASTIC_APM_CURL_INFO_SWITCH_CASE( CURLINFO_DATA_IN ); ELASTIC_APM_CURL_INFO_SWITCH_CASE( CURLINFO_DATA_OUT ); ELASTIC_APM_CURL_INFO_SWITCH_CASE( CURLINFO_SSL_DATA_IN ); ELASTIC_APM_CURL_INFO_SWITCH_CASE( CURLINFO_SSL_DATA_OUT ); ELASTIC_APM_CURL_INFO_SWITCH_CASE( CURLINFO_END ); #undef ELASTIC_APM_CURL_INFO_SWITCH_CASE default: return streamPrintf( txtOutStream, "<UNKNOWN curl_infotype value: %d>", (int)value ); } } String streamCurlData( const char* dataViewBegin, size_t dataViewLength, TextOutputStream* txtOutStream ) { TextOutputStreamState txtOutStreamStateOnEntryStart; if ( ! textOutputStreamStartEntry( txtOutStream, &txtOutStreamStateOnEntryStart ) ) { return ELASTIC_APM_TEXT_OUTPUT_STREAM_NOT_ENOUGH_SPACE_MARKER; } txtOutStream->autoTermZero = false; ELASTIC_APM_FOR_EACH_INDEX( i, dataViewLength ) { if ( textOutputStreamIsOverflowed( txtOutStream ) ) { break; } char currentChar = dataViewBegin[ i ]; // According to https://en.wikipedia.org/wiki/ASCII#Printable_characters // Codes 20 (hex) to 7E (hex), known as the printable characters if ( ELASTIC_APM_IS_IN_INCLUSIVE_RANGE( '\x20', currentChar, '\x7E' ) ) { streamChar( currentChar, txtOutStream ); } else { String asSymbol = spacialInvisibleCharToSymbol( currentChar ); if ( asSymbol != NULL ) { streamString( asSymbol, txtOutStream ); } else { streamPrintf( txtOutStream, "\\x%02X", (UInt)((unsigned char)currentChar) ); } } } return textOutputStreamEndEntry( &txtOutStreamStateOnEntryStart, txtOutStream ); } /** * @link https://curl.se/libcurl/c/CURLOPT_DEBUGFUNCTION.html */ void curlDebugCallback( CURL* curlHandle, curl_infotype type, char* dataViewBegin, size_t dataViewLength, void* ctx ) { ELASTIC_APM_UNUSED( curlHandle ); ELASTIC_APM_UNUSED( ctx ); char txtOutStreamBuf[ ELASTIC_APM_TEXT_OUTPUT_STREAM_ON_STACK_BUFFER_SIZE ]; TextOutputStream txtOutStream = ELASTIC_APM_TEXT_OUTPUT_STREAM_FROM_STATIC_BUFFER( txtOutStreamBuf ); ELASTIC_APM_LOG_INFO( "type: %s, data [length: %" PRIu64 "]: %s", streamCurlInfoType( type, &txtOutStream ), (UInt64)dataViewLength, streamCurlData( dataViewBegin, dataViewLength, &txtOutStream ) ); } void enableCurlVerboseMode( CURL* curlHandle ) { ELASTIC_APM_LOG_DEBUG_FUNCTION_ENTRY(); ResultCode resultCode; char txtOutStreamBuf[ ELASTIC_APM_TEXT_OUTPUT_STREAM_ON_STACK_BUFFER_SIZE ]; TextOutputStream txtOutStream = ELASTIC_APM_TEXT_OUTPUT_STREAM_FROM_STATIC_BUFFER( txtOutStreamBuf ); ELASTIC_APM_CURL_EASY_SETOPT( curlHandle, CURLOPT_DEBUGFUNCTION, &curlDebugCallback ); ELASTIC_APM_CURL_EASY_SETOPT( curlHandle, CURLOPT_VERBOSE, 1L ); resultCode = resultSuccess; finally: ELASTIC_APM_LOG_DEBUG_RESULT_CODE_FUNCTION_EXIT(); ELASTIC_APM_UNUSED( resultCode ); return; failure: goto finally; } ResultCode initConnectionData( const ConfigSnapshot* config, ConnectionData* connectionData, StringView userAgentHttpHeader ) { ResultCode resultCode; enum { authBufferSize = 256 }; char auth[authBufferSize]; const char* authKind = NULL; const char* authValue = NULL; int snprintfRetVal; char txtOutStreamBuf[ ELASTIC_APM_TEXT_OUTPUT_STREAM_ON_STACK_BUFFER_SIZE ]; TextOutputStream txtOutStream = ELASTIC_APM_TEXT_OUTPUT_STREAM_FROM_STATIC_BUFFER( txtOutStreamBuf ); ELASTIC_APM_ASSERT_VALID_PTR( connectionData ); ELASTIC_APM_ASSERT( connectionData->curlHandle == NULL, "" ); ELASTIC_APM_ASSERT( connectionData->requestHeaders == NULL, "" ); ELASTIC_APM_LOG_DEBUG_FUNCTION_ENTRY_MSG( "config: {serverUrl: %s, disableSend: %s, serverTimeout: %s, devInternalBackendCommLogVerbose: %s}" "; userAgentHttpHeader: `%s'" "; curl info: %s" , config->serverUrl, boolToString( config->disableSend ), streamDuration( config->serverTimeout, &txtOutStream ), boolToString( config->devInternalBackendCommLogVerbose ) , streamStringView( userAgentHttpHeader, &txtOutStream ) , streamLibCurlInfo( &txtOutStream ) ); textOutputStreamRewind( &txtOutStream ); connectionData->curlHandle = curl_easy_init(); if ( connectionData->curlHandle == NULL ) { ELASTIC_APM_LOG_ERROR( "curl_easy_init() returned NULL; curl info: %s", streamLibCurlInfo( &txtOutStream ) ); ELASTIC_APM_SET_RESULT_CODE_AND_GOTO_FAILURE_EX( resultCurlFailure ); } ELASTIC_APM_CURL_EASY_SETOPT( connectionData->curlHandle, CURLOPT_WRITEFUNCTION, logResponse ); if ( config->devInternalBackendCommLogVerbose ) { enableCurlVerboseMode( connectionData->curlHandle ); } if ( config->serverTimeout.valueInUnits == 0 ) { ELASTIC_APM_LOG_DEBUG_FUNCTION_ENTRY_MSG( "Timeout is disabled. %s (serverTimeout): %s" , ELASTIC_APM_CFG_OPT_NAME_SERVER_TIMEOUT, streamDuration( config->serverTimeout, &txtOutStream ) ); textOutputStreamRewind( &txtOutStream ); } else { long serverTimeoutInMilliseconds = (long)durationToMilliseconds( config->serverTimeout ); ELASTIC_APM_CURL_EASY_SETOPT( connectionData->curlHandle, CURLOPT_TIMEOUT_MS, serverTimeoutInMilliseconds ); } if ( ! config->verifyServerCert ) { ELASTIC_APM_LOG_DEBUG( "verify_server_cert configuration option is set to false - disabling SSL/TLS certificate verification for communication with APM Server..." ); /** * This option determines whether libcurl verifies that the server cert is for the server it is known as. * When negotiating TLS and SSL connections, the server sends a certificate indicating its identity. * When CURLOPT_SSL_VERIFYHOST is 2, that certificate must indicate that the server is the server to which you meant to connect, or the connection fails. * Simply put, it means it has to have the same name in the certificate as is in the URL you operate against. * When the verify value is 0, the connection succeeds regardless of the names in the certificate. * * @link https://curl.se/libcurl/c/CURLOPT_SSL_VERIFYHOST.html */ ELASTIC_APM_CURL_EASY_SETOPT( connectionData->curlHandle, CURLOPT_SSL_VERIFYHOST, 0L ); /** * This option determines whether curl verifies the authenticity of the peer's certificate. A value of 1 means curl verifies; 0 (zero) means it does not. * Authenticating the certificate is not enough to be sure about the server. You typically also want to ensure that the server is the server you mean to be talking to. * Use CURLOPT_SSL_VERIFYHOST for that. * The check that the host name in the certificate is valid for the host name you are connecting to is done independently of the CURLOPT_SSL_VERIFYPEER option. * * @link https://curl.se/libcurl/c/CURLOPT_SSL_VERIFYPEER.html */ ELASTIC_APM_CURL_EASY_SETOPT( connectionData->curlHandle, CURLOPT_SSL_VERIFYPEER, 0L ); } // Authorization with API key or secret token if present if ( ! isNullOrEmtpyString( config->apiKey ) ) { authKind = "ApiKey"; authValue = config->apiKey; } else if ( ! isNullOrEmtpyString( config->secretToken ) ) { authKind = "Bearer"; authValue = config->secretToken; } if ( authValue != NULL ) { snprintfRetVal = snprintf( auth, authBufferSize, "Authorization: %s %s", authKind, authValue ); if ( snprintfRetVal < 0 || snprintfRetVal >= authBufferSize ) { ELASTIC_APM_LOG_ERROR( "Failed to build Authorization header." " snprintfRetVal: %d. authKind: %s. authValue: %s.", snprintfRetVal, authKind, authValue ); ELASTIC_APM_SET_RESULT_CODE_AND_GOTO_FAILURE(); } ELASTIC_APM_LOG_TRACE( "Adding header: %s", auth ); ELASTIC_APM_CALL_IF_FAILED_GOTO( addToCurlStringList( /* in,out */ &connectionData->requestHeaders, auth ) ); } ELASTIC_APM_CALL_IF_FAILED_GOTO( addToCurlStringList( /* in,out */ &connectionData->requestHeaders, "Content-Type: application/x-ndjson" ) ); ELASTIC_APM_CURL_EASY_SETOPT( connectionData->curlHandle, CURLOPT_HTTPHEADER, connectionData->requestHeaders ); ELASTIC_APM_CURL_EASY_SETOPT( connectionData->curlHandle, CURLOPT_USERAGENT, userAgentHttpHeader ); resultCode = resultSuccess; finally: ELASTIC_APM_LOG_DEBUG_RESULT_CODE_FUNCTION_EXIT(); return resultCode; failure: goto finally; } ResultCode syncSendEventsToApmServerWithConn( const ConfigSnapshot* config, ConnectionData* connectionData, StringView serializedEvents ) { ResultCode resultCode; CURLcode curlResult; enum { urlBufferSize = 256 }; char url[urlBufferSize]; int snprintfRetVal; char txtOutStreamBuf[ ELASTIC_APM_TEXT_OUTPUT_STREAM_ON_STACK_BUFFER_SIZE ]; TextOutputStream txtOutStream = ELASTIC_APM_TEXT_OUTPUT_STREAM_FROM_STATIC_BUFFER( txtOutStreamBuf ); long responseCode = 0; bool isFailed = true; const char *serverUrlAndQuerySeparator = std::string_view(config->serverUrl).ends_with('/') ? "" : "/"; ELASTIC_APM_ASSERT_VALID_PTR( connectionData ); ELASTIC_APM_ASSERT( connectionData->curlHandle != NULL, "" ); ELASTIC_APM_LOG_DEBUG_FUNCTION_ENTRY(); ELASTIC_APM_CURL_EASY_SETOPT( connectionData->curlHandle, CURLOPT_POST, 1L ); ELASTIC_APM_CURL_EASY_SETOPT( connectionData->curlHandle, CURLOPT_POSTFIELDS, serializedEvents.begin ); ELASTIC_APM_CURL_EASY_SETOPT( connectionData->curlHandle, CURLOPT_POSTFIELDSIZE, serializedEvents.length ); snprintfRetVal = snprintf( url, urlBufferSize, "%s%sintake/v2/events", config->serverUrl, serverUrlAndQuerySeparator); if ( snprintfRetVal < 0 || snprintfRetVal >= urlBufferSize ) { ELASTIC_APM_LOG_ERROR( "Failed to build full URL to APM Server's intake API. snprintfRetVal: %d", snprintfRetVal ); ELASTIC_APM_SET_RESULT_CODE_AND_GOTO_FAILURE(); } ELASTIC_APM_CURL_EASY_SETOPT( connectionData->curlHandle, CURLOPT_URL, url ); curlResult = curl_easy_perform( connectionData->curlHandle ); if ( curlResult != CURLE_OK ) { ELASTIC_APM_LOG_ERROR( "Sending events to APM Server failed" "; URL: `%s'" "; error message: `%s'" "; curl info: %s" "; current process command line: `%s'" , url , curl_easy_strerror( curlResult ) , streamLibCurlInfo( &txtOutStream ) , streamCurrentProcessCommandLine( &txtOutStream, /* maxLength */ 200 ) ); ELASTIC_APM_SET_RESULT_CODE_AND_GOTO_FAILURE(); } curl_easy_getinfo( connectionData->curlHandle, CURLINFO_RESPONSE_CODE, &responseCode ); /** * If the HTTP response status code isn’t 2xx or if a request is prematurely closed (either on the TCP or HTTP level) the request MUST be considered failed. * * @see https://github.com/elastic/apm/blob/d8cb5607dbfffea819ab5efc9b0743044772fb23/specs/agents/transport.md#transport-errors */ isFailed = ( responseCode / 100 ) != 2; ELASTIC_APM_LOG_WITH_LEVEL( isFailed ? logLevel_error : logLevel_debug, "Sent events to APM Server. Response HTTP code: %ld. URL: `%s'.", responseCode, url ); resultCode = isFailed ? resultFailure : resultSuccess; finally: ELASTIC_APM_LOG_DEBUG_RESULT_CODE_FUNCTION_EXIT(); return resultCode; failure: goto finally; } ResultCode syncSendEventsToApmServer( const ConfigSnapshot* config, StringView userAgentHttpHeader, StringView serializedEvents ) { char txtOutStreamBuf[ ELASTIC_APM_TEXT_OUTPUT_STREAM_ON_STACK_BUFFER_SIZE ]; TextOutputStream txtOutStream = ELASTIC_APM_TEXT_OUTPUT_STREAM_FROM_STATIC_BUFFER( txtOutStreamBuf ); ResultCode resultCode; ConnectionData* connectionData = &g_connectionData; ELASTIC_APM_ASSERT_VALID_PTR( connectionData ); ELASTIC_APM_LOG_DEBUG_FUNCTION_ENTRY_MSG( "Sending events to APM Server..." "; config: { serverUrl: %s, disableSend: %s, serverTimeout: %s }" "; userAgentHttpHeader: `%s'" "; serializedEvents [length: %" PRIu64 "]:\n%.*s" , config->serverUrl , boolToString( config->disableSend ) , streamDuration( config->serverTimeout, &txtOutStream ) , streamStringView( userAgentHttpHeader, &txtOutStream ) , (UInt64) serializedEvents.length, (int) serializedEvents.length, serializedEvents.begin ); textOutputStreamRewind( &txtOutStream ); if ( config->disableSend ) { ELASTIC_APM_LOG_DEBUG( "disable_send (disableSend) configuration option is set to true - discarding events instead of sending" ); ELASTIC_APM_SET_RESULT_CODE_TO_SUCCESS_AND_GOTO_FINALLY(); } if ( backendCommBackoff_shouldWait( &connectionData->backoff ) ) { ELASTIC_APM_LOG_DEBUG( "Backoff wait time has not elapsed yet - discarding events instead of sending" ); ELASTIC_APM_SET_RESULT_CODE_TO_SUCCESS_AND_GOTO_FINALLY(); } if ( connectionData->curlHandle == NULL ) { ELASTIC_APM_CALL_IF_FAILED_GOTO( initConnectionData( config, connectionData, userAgentHttpHeader ) ); } ELASTIC_APM_CALL_IF_FAILED_GOTO( syncSendEventsToApmServerWithConn( config, connectionData, serializedEvents ) ); backendCommBackoff_onSuccess( &connectionData->backoff ); resultCode = resultSuccess; finally: ELASTIC_APM_LOG_DEBUG_RESULT_CODE_FUNCTION_EXIT(); return resultCode; failure: backendCommBackoff_onError( &connectionData->backoff ); cleanupConnectionData( connectionData ); goto finally; } #undef ELASTIC_APM_CURL_EASY_SETOPT struct DataToSendNode; typedef struct DataToSendNode DataToSendNode; struct DataToSendNode { UInt64 id; DataToSendNode* prev; DataToSendNode* next; StringBuffer userAgentHttpHeader; StringBuffer serializedEvents; }; static void freeDataToSendNode( DataToSendNode** nodeOutPtr ) { ELASTIC_APM_ASSERT_VALID_IN_PTR_TO_PTR( nodeOutPtr ); freeMallocedStringBuffer( /* in,out */ &( (*nodeOutPtr)->userAgentHttpHeader ) ); freeMallocedStringBuffer( /* in,out */ &( (*nodeOutPtr)->serializedEvents ) ); ELASTIC_APM_ZERO_STRUCT( *nodeOutPtr ); ELASTIC_APM_FREE_INSTANCE_AND_SET_TO_NULL( DataToSendNode, /* in,out */ *nodeOutPtr ); } struct DataToSendQueue { DataToSendNode head; DataToSendNode tail; }; typedef struct DataToSendQueue DataToSendQueue; static void initDataToSendQueue( DataToSendQueue* dataQueue ) { ELASTIC_APM_ASSERT_VALID_PTR( dataQueue ); dataQueue->head.prev = NULL; dataQueue->head.next = &dataQueue->tail; dataQueue->tail.prev = &dataQueue->head; dataQueue->tail.next = NULL; } static ResultCode addCopyToDataToSendQueue( DataToSendQueue* dataQueue , UInt64 id , StringView userAgentHttpHeader , StringView serializedEvents ) { ELASTIC_APM_ASSERT_VALID_PTR( dataQueue ); ResultCode resultCode; DataToSendNode* newNode = NULL; ELASTIC_APM_MALLOC_INSTANCE_IF_FAILED_GOTO( DataToSendNode, /* out */ newNode ); ELASTIC_APM_ZERO_STRUCT( newNode ); ELASTIC_APM_CALL_IF_FAILED_GOTO( dupMallocStringView( userAgentHttpHeader, /* out */ &( newNode->userAgentHttpHeader ) ) ); ELASTIC_APM_CALL_IF_FAILED_GOTO( dupMallocStringView( serializedEvents, /* out */ &( newNode->serializedEvents ) ) ); resultCode = resultSuccess; newNode->id = id; newNode->next = &( dataQueue->tail ); newNode->prev = dataQueue->tail.prev; dataQueue->tail.prev->next = newNode; dataQueue->tail.prev = newNode; finally: return resultCode; failure: freeDataToSendNode( &newNode ); goto finally; } static bool isDataToSendQueueEmpty( const DataToSendQueue* dataQueue ) { ELASTIC_APM_ASSERT_VALID_PTR( dataQueue ); return dataQueue->head.next == &( dataQueue->tail ); } DataToSendNode* getFirstNodeInDataToSendQueue( DataToSendQueue* dataQueue ) { ELASTIC_APM_ASSERT_VALID_PTR( dataQueue ); return isDataToSendQueueEmpty( dataQueue ) ? NULL : dataQueue->head.next; } size_t removeFirstNodeInDataToSendQueue( DataToSendQueue* dataQueue ) { ELASTIC_APM_ASSERT_VALID_PTR( dataQueue ); ELASTIC_APM_ASSERT( ! isDataToSendQueueEmpty( dataQueue ), "" ); DataToSendNode* firstNode = dataQueue->head.next; // -1 since terminating '\0' is counted in buffer's size but not in string's length size_t firstNodeDataSize = firstNode->serializedEvents.size - 1; DataToSendNode* newFirstNode = firstNode->next; dataQueue->head.next = newFirstNode; newFirstNode->prev = &( dataQueue->head ); freeDataToSendNode( &firstNode ); return firstNodeDataSize; } static void freeDataToSendQueue( DataToSendQueue* dataQueue ) { ELASTIC_APM_ASSERT_VALID_PTR( dataQueue ); while ( ! isDataToSendQueueEmpty( dataQueue ) ) { removeFirstNodeInDataToSendQueue( dataQueue ); } } #define ELASTIC_APM_MAX_QUEUE_SIZE_IN_BYTES (10 * 1024 * 1024) struct BackgroundBackendComm { Mutex* mutex; ConditionVariable* condVar; Thread* thread; DataToSendQueue dataToSendQueue; size_t dataToSendTotalSize; size_t nextEventsBatchId; bool shouldExit; TimeSpec shouldExitBy; }; typedef struct BackgroundBackendComm BackgroundBackendComm; struct BackgroundBackendCommSharedStateSnapshot { const DataToSendNode* firstDataToSendNode; size_t dataToSendTotalSize; bool shouldExit; TimeSpec shouldExitBy; }; typedef struct BackgroundBackendCommSharedStateSnapshot BackgroundBackendCommSharedStateSnapshot; static inline bool isDataToSendQueueEmptyInSnapshot( const BackgroundBackendCommSharedStateSnapshot* sharedStateSnapshot ) { return sharedStateSnapshot->firstDataToSendNode == NULL; } String streamSharedStateSnapshot( const BackgroundBackendCommSharedStateSnapshot* sharedStateSnapshot, TextOutputStream* txtOutStream ) { StringView serializedEvents = { nullptr, 0 }; if ( ! isDataToSendQueueEmptyInSnapshot( sharedStateSnapshot ) ) { serializedEvents = stringBufferToView( sharedStateSnapshot->firstDataToSendNode->serializedEvents ); } return streamPrintf( txtOutStream ,"{" "total size of queued events: %" PRIu64 ", firstDataToSendNode %s NULL" " (serializedEvents.length: %" PRIu64 ")" ", shouldExit: %s" ", shouldExitBy: %s" "}" , (UInt64) sharedStateSnapshot->dataToSendTotalSize , sharedStateSnapshot->firstDataToSendNode == NULL ? "==" : "!=" , (UInt64) serializedEvents.length , boolToString( sharedStateSnapshot->shouldExit ) , sharedStateSnapshot->shouldExit ? streamUtcTimeSpecAsLocal( &(sharedStateSnapshot->shouldExitBy), txtOutStream ) : "N/A" ); } #define ELASTIC_APM_BACKGROUND_BACKEND_COMM_DO_UNDER_LOCK_PROLOG() \ ELASTIC_APM_LOG_DEBUG_FUNCTION_ENTRY(); \ ELASTIC_APM_ASSERT_VALID_PTR( backgroundBackendComm ); \ ResultCode resultCode; \ bool shouldUnlockMutex = false; \ ELASTIC_APM_CALL_IF_FAILED_GOTO( lockMutex( backgroundBackendComm->mutex, &shouldUnlockMutex, __FUNCTION__ ) ); #define ELASTIC_APM_BACKGROUND_BACKEND_COMM_DO_UNDER_LOCK_EPILOG() \ finally: \ unlockMutex( backgroundBackendComm->mutex, &shouldUnlockMutex, __FUNCTION__ ); \ ELASTIC_APM_LOG_DEBUG_RESULT_CODE_FUNCTION_EXIT(); \ return resultCode; \ failure: \ goto finally; void backgroundBackendCommThreadFunc_underLockCopySharedStateToSnapshot( BackgroundBackendComm* backgroundBackendComm , /* out */ BackgroundBackendCommSharedStateSnapshot* sharedStateSnapshot ) { ELASTIC_APM_ASSERT_VALID_PTR( sharedStateSnapshot ); sharedStateSnapshot->firstDataToSendNode = getFirstNodeInDataToSendQueue( &( backgroundBackendComm->dataToSendQueue ) ); sharedStateSnapshot->dataToSendTotalSize = backgroundBackendComm->dataToSendTotalSize; sharedStateSnapshot->shouldExit = backgroundBackendComm->shouldExit; sharedStateSnapshot->shouldExitBy = backgroundBackendComm->shouldExitBy; } static inline bool areEqualSharedSnapshots( const BackgroundBackendCommSharedStateSnapshot* val1, const BackgroundBackendCommSharedStateSnapshot* val2 ) { if ( isDataToSendQueueEmptyInSnapshot( val1 ) != isDataToSendQueueEmptyInSnapshot( val2 ) ) { return false; } if ( val1->shouldExit != val2->shouldExit ) { return false; } if ( val1->shouldExit ) { if ( compareAbsTimeSpecs( &( val1->shouldExitBy ), &( val2->shouldExitBy ) ) != 0 ) { return false; } } return true; } ResultCode backgroundBackendCommThreadFunc_getSharedStateSnapshot( BackgroundBackendComm* backgroundBackendComm , /* out */ BackgroundBackendCommSharedStateSnapshot* sharedStateSnapshot ) { ELASTIC_APM_ASSERT_VALID_PTR( sharedStateSnapshot ); ELASTIC_APM_BACKGROUND_BACKEND_COMM_DO_UNDER_LOCK_PROLOG() backgroundBackendCommThreadFunc_underLockCopySharedStateToSnapshot( backgroundBackendComm, /* out */ sharedStateSnapshot ); ELASTIC_APM_BACKGROUND_BACKEND_COMM_DO_UNDER_LOCK_EPILOG() } ResultCode backgroundBackendCommThreadFunc_shouldBreakLoop( const BackgroundBackendCommSharedStateSnapshot* sharedStateSnapshot , bool* shouldBreakLoop ) { ResultCode resultCode; if ( sharedStateSnapshot->shouldExit ) { if ( isDataToSendQueueEmptyInSnapshot( sharedStateSnapshot ) ) { *shouldBreakLoop = true; goto success; } TimeSpec now; ELASTIC_APM_CALL_IF_FAILED_GOTO( getCurrentAbsTimeSpec( /* out */ &now ) ); if ( compareAbsTimeSpecs( &sharedStateSnapshot->shouldExitBy, &now ) < 0 ) { *shouldBreakLoop = true; goto success; } } *shouldBreakLoop = false; success: resultCode = resultSuccess; finally: return resultCode; failure: goto finally; } ResultCode backgroundBackendCommThreadFunc_removeFirstEventsBatchAndUpdateSnapshot( BackgroundBackendComm* backgroundBackendComm , /* out */ BackgroundBackendCommSharedStateSnapshot* sharedStateSnapshot ) { size_t firstNodeDataSize = 0; ELASTIC_APM_BACKGROUND_BACKEND_COMM_DO_UNDER_LOCK_PROLOG() firstNodeDataSize = removeFirstNodeInDataToSendQueue( &( backgroundBackendComm->dataToSendQueue ) ); backgroundBackendComm->dataToSendTotalSize -= firstNodeDataSize; backgroundBackendCommThreadFunc_underLockCopySharedStateToSnapshot( backgroundBackendComm, /* out */ sharedStateSnapshot ); ELASTIC_APM_BACKGROUND_BACKEND_COMM_DO_UNDER_LOCK_EPILOG() } ResultCode backgroundBackendCommThreadFunc_waitForChangesInSharedState( BackgroundBackendComm* backgroundBackendComm , /* in,out */ BackgroundBackendCommSharedStateSnapshot* sharedStateSnapshot ) { char txtOutStreamBuf[ ELASTIC_APM_TEXT_OUTPUT_STREAM_ON_STACK_BUFFER_SIZE ]; TextOutputStream txtOutStream = ELASTIC_APM_TEXT_OUTPUT_STREAM_FROM_STATIC_BUFFER( txtOutStreamBuf ); ELASTIC_APM_BACKGROUND_BACKEND_COMM_DO_UNDER_LOCK_PROLOG() BackgroundBackendCommSharedStateSnapshot localSharedStateSnapshot; backgroundBackendCommThreadFunc_underLockCopySharedStateToSnapshot( backgroundBackendComm, /* out */ &localSharedStateSnapshot ); if ( areEqualSharedSnapshots( sharedStateSnapshot, &localSharedStateSnapshot ) ) { ELASTIC_APM_LOG_DEBUG( "Shared state is the same - we need to wait; shared state snapshots: before lock: %s, after lock: %s" , streamSharedStateSnapshot( sharedStateSnapshot, &txtOutStream ) , streamSharedStateSnapshot( &localSharedStateSnapshot, &txtOutStream ) ); textOutputStreamRewind( &txtOutStream ); ELASTIC_APM_CALL_IF_FAILED_GOTO( waitConditionVariable( backgroundBackendComm->condVar, backgroundBackendComm->mutex, __FUNCTION__ ) ); backgroundBackendCommThreadFunc_underLockCopySharedStateToSnapshot( backgroundBackendComm, /* out */ sharedStateSnapshot ); ELASTIC_APM_LOG_DEBUG( "Waiting exited; shared state snapshots: after lock: %s, after wait: %s" , streamSharedStateSnapshot( &localSharedStateSnapshot, &txtOutStream ) , streamSharedStateSnapshot( sharedStateSnapshot, &txtOutStream ) ); } else { ELASTIC_APM_LOG_DEBUG( "Shared state is not the same - there is no need to wait; shared state snapshots: before lock: %s, after lock: %s" , streamSharedStateSnapshot( sharedStateSnapshot, &txtOutStream ) , streamSharedStateSnapshot( &localSharedStateSnapshot, &txtOutStream ) ); *sharedStateSnapshot = localSharedStateSnapshot; } ELASTIC_APM_BACKGROUND_BACKEND_COMM_DO_UNDER_LOCK_EPILOG() } ResultCode backgroundBackendCommThreadFunc_sendFirstEventsBatch( const ConfigSnapshot* config , const BackgroundBackendCommSharedStateSnapshot* sharedStateSnapshot ) { // This function is called only when data-queue-to-send is not empty // so firstDataToSendNode is not NULL StringView serializedEvents = stringBufferToView( sharedStateSnapshot->firstDataToSendNode->serializedEvents ); ELASTIC_APM_LOG_DEBUG( "About to send batch of events" "; batch ID: %" PRIu64 "; batch size: %" PRIu64 "; total size of queued events: %" PRIu64 , (UInt64) sharedStateSnapshot->firstDataToSendNode->id , (UInt64) serializedEvents.length , (UInt64) sharedStateSnapshot->dataToSendTotalSize ); ResultCode resultCode; resultCode = syncSendEventsToApmServer( config , stringBufferToView( sharedStateSnapshot->firstDataToSendNode->userAgentHttpHeader ) , serializedEvents ); // If we failed to send the currently first batch we return success nevertheless // it means that this batch will be removed, and we will continue on to sending the rest of the queued events if ( resultCode != resultSuccess ) { ELASTIC_APM_LOG_ERROR( "Failed to send batch of events - the batch will be dequeued and dropped" "; batch ID: %" PRIu64 "; batch size: %" PRIu64 "; total size of queued events: %" PRIu64 , (UInt64) sharedStateSnapshot->firstDataToSendNode->id , (UInt64) serializedEvents.length , (UInt64) sharedStateSnapshot->dataToSendTotalSize ); } return resultSuccess; } #undef ELASTIC_APM_BACKGROUND_BACKEND_COMM_DO_UNDER_LOCK_EPILOG #undef ELASTIC_APM_BACKGROUND_BACKEND_COMM_DO_UNDER_LOCK_PROLOG void backgroundBackendCommThreadFunc_logSharedStateSnapshot( const BackgroundBackendCommSharedStateSnapshot* sharedStateSnapshot ) { StringView serializedEvents = { nullptr, 0 }; char txtOutStreamBuf[ ELASTIC_APM_TEXT_OUTPUT_STREAM_ON_STACK_BUFFER_SIZE ]; TextOutputStream txtOutStream = ELASTIC_APM_TEXT_OUTPUT_STREAM_FROM_STATIC_BUFFER( txtOutStreamBuf ); ELASTIC_APM_LOG_TRACE( "Shared state snapshot: %s", streamSharedStateSnapshot( sharedStateSnapshot, &txtOutStream ) ); if ( ! isDataToSendQueueEmptyInSnapshot( sharedStateSnapshot ) ) { serializedEvents = stringBufferToView( sharedStateSnapshot->firstDataToSendNode->serializedEvents ); } ELASTIC_APM_ASSERT( (sharedStateSnapshot->dataToSendTotalSize == 0) == ( sharedStateSnapshot->firstDataToSendNode == NULL ) , "dataToSendTotalSize: %" PRIu64 ", firstDataToSendNode: %p (serializedEvents.length: %" PRIu64 ")" , (UInt64) sharedStateSnapshot->dataToSendTotalSize , sharedStateSnapshot->firstDataToSendNode , (UInt64) serializedEvents.length ); } void* backgroundBackendCommThreadFunc( void* arg ) { ELASTIC_APM_LOG_DEBUG_FUNCTION_ENTRY(); ELASTIC_APM_ASSERT_VALID_PTR( arg ); ResultCode resultCode; BackgroundBackendComm* backgroundBackendComm = (BackgroundBackendComm*)arg; const ConfigSnapshot* config = getTracerCurrentConfigSnapshot( getGlobalTracer() ); BackgroundBackendCommSharedStateSnapshot sharedStateSnapshot; ELASTIC_APM_CALL_IF_FAILED_GOTO( backgroundBackendCommThreadFunc_getSharedStateSnapshot( backgroundBackendComm, /* out */ &sharedStateSnapshot ) ); while ( true ) { backgroundBackendCommThreadFunc_logSharedStateSnapshot( &sharedStateSnapshot ); bool shouldBreakLoop; ELASTIC_APM_CALL_IF_FAILED_GOTO( backgroundBackendCommThreadFunc_shouldBreakLoop( /* in */ &sharedStateSnapshot, /* out */ &shouldBreakLoop ) ); if ( shouldBreakLoop ) { break; } if ( isDataToSendQueueEmptyInSnapshot( &sharedStateSnapshot ) ) { ELASTIC_APM_CALL_IF_FAILED_GOTO( backgroundBackendCommThreadFunc_waitForChangesInSharedState( backgroundBackendComm, /* out */ &sharedStateSnapshot ) ); continue; } ELASTIC_APM_CALL_IF_FAILED_GOTO( backgroundBackendCommThreadFunc_sendFirstEventsBatch( config, /* in */ &sharedStateSnapshot ) ); ELASTIC_APM_CALL_IF_FAILED_GOTO( backgroundBackendCommThreadFunc_removeFirstEventsBatchAndUpdateSnapshot( backgroundBackendComm, /* out */ &sharedStateSnapshot ) ); } resultCode = resultSuccess; finally: ELASTIC_APM_LOG_DEBUG_RESULT_CODE_FUNCTION_EXIT(); return NULL; failure: goto finally; } ResultCode unwindBackgroundBackendComm( BackgroundBackendComm** backgroundBackendCommOutPtr, const TimeSpec* timeoutAbsUtc, bool isCreatedByThisProcess ) { ELASTIC_APM_LOG_DEBUG_FUNCTION_ENTRY_MSG( "isCreatedByThisProcess: %s", boolToString( isCreatedByThisProcess ) ); ELASTIC_APM_ASSERT_VALID_PTR( backgroundBackendCommOutPtr ); // ELASTIC_APM_ASSERT_VALID_PTR( timeoutAbsUtc ); <- timeoutAbsUtc can be NULL ResultCode resultCode; BackgroundBackendComm* backgroundBackendComm = *backgroundBackendCommOutPtr; if ( backgroundBackendComm == NULL ) { resultCode = resultSuccess; goto finally; } if ( ! isCreatedByThisProcess ) { ELASTIC_APM_LOG_DEBUG( "Deallocating memory related to background communication data structures inherited from parent process after fork" " without actually properly destroying synchronization primitives since it's impossible to do in child process" "; parent PID: %d" , (int)getParentProcessId() ); } if ( backgroundBackendComm->thread != NULL ) { void* backgroundBackendCommThreadFuncRetVal = NULL; bool hasTimedOut; ELASTIC_APM_CALL_IF_FAILED_GOTO( timedJoinAndDeleteThread( &( backgroundBackendComm->thread ), &backgroundBackendCommThreadFuncRetVal, timeoutAbsUtc, isCreatedByThisProcess, &hasTimedOut, __FUNCTION__ ) ); if ( hasTimedOut ) { ELASTIC_APM_LOG_ERROR( "Join to thread for background backend communications timed out - skipping the rest of cleanup and exiting" ); ELASTIC_APM_SET_RESULT_CODE_AND_GOTO_FAILURE(); } } if ( backgroundBackendComm->condVar != NULL ) { ELASTIC_APM_CALL_IF_FAILED_GOTO( deleteConditionVariable( &( backgroundBackendComm->condVar ), isCreatedByThisProcess ) ); } if ( backgroundBackendComm->mutex != NULL ) { ELASTIC_APM_CALL_IF_FAILED_GOTO( deleteMutex( &( backgroundBackendComm->mutex ) ) ); } resultCode = resultSuccess; freeDataToSendQueue( &( backgroundBackendComm->dataToSendQueue ) ); ELASTIC_APM_FREE_INSTANCE_AND_SET_TO_NULL( BackgroundBackendComm, *backgroundBackendCommOutPtr ); finally: ELASTIC_APM_LOG_DEBUG_RESULT_CODE_FUNCTION_EXIT(); return resultCode; failure: goto finally; } static BackgroundBackendComm* g_backgroundBackendComm = NULL; static bool deriveAsyncBackendComm( const ConfigSnapshot* config, String* dbgReason ) { if ( config->asyncBackendComm.isSet ) { *dbgReason = config->asyncBackendComm.value ? "explicitly set to true" : "explicitly set to false"; return config->asyncBackendComm.value; } *dbgReason = "implicitly set to true"; return true; } ResultCode newBackgroundBackendComm( const ConfigSnapshot* config, BackgroundBackendComm** backgroundBackendCommOut ) { ELASTIC_APM_LOG_DEBUG_FUNCTION_ENTRY(); ResultCode resultCode; BackgroundBackendComm* backgroundBackendComm = NULL; ELASTIC_APM_MALLOC_INSTANCE_IF_FAILED_GOTO( BackgroundBackendComm, /* out */ backgroundBackendComm ); backgroundBackendComm->condVar = NULL; backgroundBackendComm->mutex = NULL; backgroundBackendComm->thread = NULL; initDataToSendQueue( &( backgroundBackendComm->dataToSendQueue ) ); backgroundBackendComm->dataToSendTotalSize = 0; backgroundBackendComm->nextEventsBatchId = 1; backgroundBackendComm->shouldExit = false; ELASTIC_APM_CALL_IF_FAILED_GOTO( newMutex( &( backgroundBackendComm->mutex ), /* dbgDesc */ "Background backend communications" ) ); ELASTIC_APM_CALL_IF_FAILED_GOTO( newConditionVariable( &( backgroundBackendComm->condVar ), /* dbgDesc */ "Background backend communications" ) ); resultCode = newThread( &( backgroundBackendComm->thread ) , &backgroundBackendCommThreadFunc , /* threadFuncArg: */ backgroundBackendComm , /* thread's dbgDesc */ "Background backend communications" ); if ( resultCode == resultSuccess ) { ELASTIC_APM_LOG_DEBUG( "Started thread for background backend communications; thread ID: %" PRIu64 , getThreadId( backgroundBackendComm->thread ) ); } resultCode = resultSuccess; *backgroundBackendCommOut = backgroundBackendComm; finally: ELASTIC_APM_LOG_DEBUG_RESULT_CODE_FUNCTION_EXIT(); return resultCode; failure: unwindBackgroundBackendComm( &backgroundBackendComm, /* timeoutAbsUtc: */ NULL, /* isCreatedByThisProcess */ true ); goto finally; } ResultCode backgroundBackendCommEnsureInited( const ConfigSnapshot* config ) { ELASTIC_APM_LOG_DEBUG_FUNCTION_ENTRY(); ResultCode resultCode; if ( g_backgroundBackendComm == NULL ) { ELASTIC_APM_CALL_IF_FAILED_GOTO( newBackgroundBackendComm( config, &g_backgroundBackendComm ) ); } resultCode = resultSuccess; finally: ELASTIC_APM_LOG_DEBUG_RESULT_CODE_FUNCTION_EXIT(); return resultCode; failure: goto finally; } static ResultCode signalBackgroundBackendCommThreadToExit( const ConfigSnapshot* config , BackgroundBackendComm* backgroundBackendComm , /* out */ TimeSpec* shouldExitBy ) { ELASTIC_APM_LOG_DEBUG_FUNCTION_ENTRY(); ResultCode resultCode; bool shouldUnlockMutex = false; char txtOutStreamBuf[ ELASTIC_APM_TEXT_OUTPUT_STREAM_ON_STACK_BUFFER_SIZE ]; TextOutputStream txtOutStream = ELASTIC_APM_TEXT_OUTPUT_STREAM_FROM_STATIC_BUFFER( txtOutStreamBuf ); ELASTIC_APM_CALL_IF_FAILED_GOTO( lockMutex( backgroundBackendComm->mutex, &shouldUnlockMutex, __FUNCTION__ ) ); backgroundBackendComm->shouldExit = true; ELASTIC_APM_CALL_IF_FAILED_GOTO( getCurrentAbsTimeSpec( /* out */ shouldExitBy ) ); addDelayToAbsTimeSpec( /* in, out */ shouldExitBy, (long)durationToMilliseconds( config->serverTimeout ) * ELASTIC_APM_NUMBER_OF_NANOSECONDS_IN_MILLISECOND ); backgroundBackendComm->shouldExitBy = *shouldExitBy; ELASTIC_APM_CALL_IF_FAILED_GOTO( signalConditionVariable( backgroundBackendComm->condVar, __FUNCTION__ ) ); resultCode = resultSuccess; finally: unlockMutex( backgroundBackendComm->mutex, &shouldUnlockMutex, __FUNCTION__ ); ELASTIC_APM_LOG_DEBUG_RESULT_CODE_FUNCTION_EXIT_MSG( "shouldExitBy: %s, serverTimeout: %s" , streamUtcTimeSpecAsLocal( shouldExitBy, &txtOutStream ), streamDuration( config->serverTimeout, &txtOutStream ) ); textOutputStreamRewind( &txtOutStream ); return resultCode; failure: goto finally; } void backgroundBackendCommOnModuleShutdown( const ConfigSnapshot* config ) { BackgroundBackendComm* backgroundBackendComm = g_backgroundBackendComm; ResultCode resultCode; if ( backgroundBackendComm != NULL ) { TimeSpec shouldExitBy; ELASTIC_APM_CALL_IF_FAILED_GOTO( signalBackgroundBackendCommThreadToExit( config, backgroundBackendComm, /* out */ &shouldExitBy ) ); ELASTIC_APM_CALL_IF_FAILED_GOTO( unwindBackgroundBackendComm( &backgroundBackendComm, &shouldExitBy, /* isCreatedByThisProcess */ true ) ); } resultCode = resultSuccess; finally: cleanupConnectionData( &g_connectionData ); g_backgroundBackendComm = NULL; return; failure: goto finally; } static ResultCode enqueueEventsToSendToApmServer( StringView userAgentHttpHeader, StringView serializedEvents ) { char txtOutStreamBuf[ ELASTIC_APM_TEXT_OUTPUT_STREAM_ON_STACK_BUFFER_SIZE ]; TextOutputStream txtOutStream = ELASTIC_APM_TEXT_OUTPUT_STREAM_FROM_STATIC_BUFFER( txtOutStreamBuf ); ELASTIC_APM_LOG_DEBUG( "Queueing events to send asynchronously..." "; userAgentHttpHeader [length: %" PRIu64 "]: `%.*s'" "; serializedEvents [length: %" PRIu64 "]:\n%.*s" , (UInt64) userAgentHttpHeader.length, (int) userAgentHttpHeader.length, userAgentHttpHeader.begin , (UInt64) serializedEvents.length, (int) serializedEvents.length, serializedEvents.begin ); textOutputStreamRewind( &txtOutStream ); ResultCode resultCode; bool shouldUnlockMutex = false; UInt64 id; BackgroundBackendComm* backgroundBackendComm = g_backgroundBackendComm; ELASTIC_APM_CALL_IF_FAILED_GOTO( lockMutex( backgroundBackendComm->mutex, &shouldUnlockMutex, __FUNCTION__ ) ); if ( backgroundBackendComm->dataToSendTotalSize >= ELASTIC_APM_MAX_QUEUE_SIZE_IN_BYTES ) { ELASTIC_APM_LOG_ERROR( "Already queued events are above max queue size - dropping these events" "; size of already queued events: %" PRIu64 , (UInt64) backgroundBackendComm->dataToSendTotalSize ); ELASTIC_APM_SET_RESULT_CODE_AND_GOTO_FAILURE(); } id = backgroundBackendComm->nextEventsBatchId; ELASTIC_APM_CALL_IF_FAILED_GOTO( addCopyToDataToSendQueue( &( backgroundBackendComm->dataToSendQueue ) , id , userAgentHttpHeader , serializedEvents ) ); backgroundBackendComm->dataToSendTotalSize += serializedEvents.length; ++backgroundBackendComm->nextEventsBatchId; ELASTIC_APM_LOG_DEBUG( "Queued a batch of events" "; batch ID: %" PRIu64 "; batch size: %" PRIu64 "; total size of queued events: %" PRIu64 , (UInt64) id , (UInt64) serializedEvents.length , (UInt64) backgroundBackendComm->dataToSendTotalSize ); ELASTIC_APM_CALL_IF_FAILED_GOTO( signalConditionVariable( backgroundBackendComm->condVar, __FUNCTION__ ) ); resultCode = resultSuccess; finally: unlockMutex( backgroundBackendComm->mutex, &shouldUnlockMutex, __FUNCTION__ ); ELASTIC_APM_LOG_DEBUG_RESULT_CODE_FUNCTION_EXIT_MSG( "Finished queueing events to send asynchronously" "; serializedEvents [length: %" PRIu64 "]:\n%.*s" , (UInt64) serializedEvents.length, (int) serializedEvents.length, serializedEvents.begin ); return resultCode; failure: goto finally; } ResultCode sendEventsToApmServer( const ConfigSnapshot* config, StringView userAgentHttpHeader, StringView serializedEvents ) { ResultCode resultCode; char txtOutStreamBuf[ ELASTIC_APM_TEXT_OUTPUT_STREAM_ON_STACK_BUFFER_SIZE ]; TextOutputStream txtOutStream = ELASTIC_APM_TEXT_OUTPUT_STREAM_FROM_STATIC_BUFFER( txtOutStreamBuf ); ELASTIC_APM_LOG_DEBUG( "Handling request to send events..." "; config: { serverUrl: %s, disableSend: %s, serverTimeout: %s }" "; userAgentHttpHeader [length: %" PRIu64 "]: `%.*s'" "; serializedEvents [length: %" PRIu64 "]:\n%.*s" , config->serverUrl , boolToString( config->disableSend ) , streamDuration( config->serverTimeout, &txtOutStream ) , (UInt64) userAgentHttpHeader.length, (int) userAgentHttpHeader.length, userAgentHttpHeader.begin , (UInt64) serializedEvents.length, (int) serializedEvents.length, serializedEvents.begin ); textOutputStreamRewind( &txtOutStream ); String dbgAsyncBackendCommReason = NULL; bool shouldSendAsync = deriveAsyncBackendComm( config, &dbgAsyncBackendCommReason ); ELASTIC_APM_LOG_DEBUG( "async_backend_comm (asyncBackendComm) configuration option is %s - sending events %s" , dbgAsyncBackendCommReason, ( shouldSendAsync ? "asynchronously" : "synchronously" ) ); if ( shouldSendAsync ) { ELASTIC_APM_CALL_IF_FAILED_GOTO( backgroundBackendCommEnsureInited( config ) ); ELASTIC_APM_CALL_IF_FAILED_GOTO( enqueueEventsToSendToApmServer( userAgentHttpHeader, serializedEvents ) ); } else { ELASTIC_APM_CALL_IF_FAILED_GOTO( syncSendEventsToApmServer( config, userAgentHttpHeader, serializedEvents ) ); } resultCode = resultSuccess; finally: ELASTIC_APM_LOG_DEBUG_RESULT_CODE_FUNCTION_EXIT(); return resultCode; failure: goto finally; } ResultCode resetBackgroundBackendCommStateInForkedChild() { ELASTIC_APM_LOG_DEBUG_FUNCTION_ENTRY_MSG( "g_backgroundBackendComm %s NULL", (g_backgroundBackendComm == NULL) ? "==" : "!=" ); ResultCode resultCode; if ( g_backgroundBackendComm != NULL ) { ELASTIC_APM_CALL_IF_FAILED_GOTO( unwindBackgroundBackendComm( &g_backgroundBackendComm, /* timeoutAbsUtc: */ NULL, /* isCreatedByThisProcess */ false ) ); } resultCode = resultSuccess; finally: ELASTIC_APM_LOG_DEBUG_RESULT_CODE_FUNCTION_EXIT(); return resultCode; failure: goto finally; }