core/runner/sink/http/HttpSink.cpp (293 lines of code) (raw):

// Copyright 2024 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "runner/sink/http/HttpSink.h" #include <optional> #include "app_config/AppConfig.h" #include "collection_pipeline/plugin/interface/HttpFlusher.h" #include "collection_pipeline/queue/QueueKeyManager.h" #include "collection_pipeline/queue/SenderQueueItem.h" #include "common/Flags.h" #include "common/StringTools.h" #include "common/http/Curl.h" #include "logger/Logger.h" #include "monitor/metric_constants/MetricConstants.h" #include "runner/FlusherRunner.h" #ifdef APSARA_UNIT_TEST_MAIN #include "unittest/pipeline/HttpSinkMock.h" #endif DEFINE_FLAG_INT32(http_sink_exit_timeout_sec, "", 5); using namespace std; namespace logtail { HttpSink* HttpSink::GetInstance() { #ifndef APSARA_UNIT_TEST_MAIN static HttpSink instance; return &instance; #else return HttpSinkMock::GetInstance(); #endif } bool HttpSink::Init() { mClient = curl_multi_init(); if (mClient == nullptr) { LOG_ERROR(sLogger, ("failed to init http sink", "failed to init curl multi client")); return false; } WriteMetrics::GetInstance()->PrepareMetricsRecordRef( mMetricsRecordRef, MetricCategory::METRIC_CATEGORY_RUNNER, {{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_HTTP_SINK}}); mInItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_ITEMS_TOTAL); mLastRunTime = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME); mOutSuccessfulItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_SINK_OUT_SUCCESSFUL_ITEMS_TOTAL); mOutFailedItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_SINK_OUT_FAILED_ITEMS_TOTAL); mSuccessfulItemTotalResponseTimeMs = mMetricsRecordRef.CreateTimeCounter(METRIC_RUNNER_SINK_SUCCESSFUL_ITEM_TOTAL_RESPONSE_TIME_MS); mFailedItemTotalResponseTimeMs = mMetricsRecordRef.CreateTimeCounter(METRIC_RUNNER_SINK_FAILED_ITEM_TOTAL_RESPONSE_TIME_MS); mSendingItemsTotal = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_SINK_SENDING_ITEMS_TOTAL); mSendConcurrency = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_SINK_SEND_CONCURRENCY); // TODO: should be dynamic SET_GAUGE(mSendConcurrency, AppConfig::GetInstance()->GetSendRequestGlobalConcurrency()); mThreadRes = async(launch::async, &HttpSink::Run, this); return true; } void HttpSink::Stop() { mIsFlush = true; if (!mThreadRes.valid()) { return; } future_status s = mThreadRes.wait_for(chrono::seconds(INT32_FLAG(http_sink_exit_timeout_sec))); if (s == future_status::ready) { LOG_INFO(sLogger, ("http sink", "stopped successfully")); } else { LOG_WARNING(sLogger, ("http sink", "forced to stopped")); } } void HttpSink::Run() { LOG_INFO(sLogger, ("http sink", "started")); while (true) { SET_GAUGE(mLastRunTime, chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count()); unique_ptr<HttpSinkRequest> request; if (mQueue.WaitAndPop(request, 500)) { ADD_COUNTER(mInItemsTotal, 1); LOG_TRACE(sLogger, ("got item from flusher runner, item address", request->mItem)( "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( "wait time", ToString(chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now() - request->mEnqueTime) .count()))("try cnt", ToString(request->mTryCnt))); if (!AddRequestToClient(std::move(request))) { continue; } ADD_GAUGE(mSendingItemsTotal, 1); } else if (mIsFlush && mQueue.Empty()) { break; } else { continue; } DoRun(); } auto mc = curl_multi_cleanup(mClient); if (mc != CURLM_OK) { LOG_ERROR(sLogger, ("failed to cleanup curl multi handle", "exit anyway")("errMsg", curl_multi_strerror(mc))); } } bool HttpSink::AddRequestToClient(unique_ptr<HttpSinkRequest>&& request) { curl_slist* headers = nullptr; CURL* curl = CreateCurlHandler(request->mMethod, request->mHTTPSFlag, request->mHost, request->mPort, request->mUrl, request->mQueryString, request->mHeader, request->mBody, request->mResponse, headers, request->mTimeout, AppConfig::GetInstance()->IsHostIPReplacePolicyEnabled(), AppConfig::GetInstance()->GetBindInterface(), false, std::nullopt, std::move(request->mSocket)); if (curl == nullptr) { request->mItem->mStatus = SendingStatus::IDLE; request->mResponse.SetNetworkStatus(NetworkCode::Other, "failed to init curl handler"); FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); ADD_COUNTER(mOutFailedItemsTotal, 1); LOG_ERROR(sLogger, ("failed to send request", "failed to init curl handler")( "action", "put sender queue item back to sender queue")("item address", request->mItem)( "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); return false; } request->mPrivateData = headers; curl_easy_setopt(curl, CURLOPT_PRIVATE, request.get()); request->mLastSendTime = chrono::system_clock::now(); auto res = curl_multi_add_handle(mClient, curl); if (res != CURLM_OK) { request->mItem->mStatus = SendingStatus::IDLE; request->mResponse.SetNetworkStatus(NetworkCode::Other, "failed to add the easy curl handle to multi_handle"); FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); curl_easy_cleanup(curl); ADD_COUNTER(mOutFailedItemsTotal, 1); LOG_ERROR(sLogger, ("failed to send request", "failed to add the easy curl handle to multi_handle")("errMsg", curl_multi_strerror(res))( "action", "put sender queue item back to sender queue")("item address", request->mItem)( "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); return false; } // let sink destruct the request request.release(); return true; } void HttpSink::DoRun() { CURLMcode mc; int runningHandlers = 1; while (runningHandlers) { auto curTime = chrono::system_clock::now(); SET_GAUGE(mLastRunTime, chrono::duration_cast<chrono::seconds>(curTime.time_since_epoch()).count()); if ((mc = curl_multi_perform(mClient, &runningHandlers)) != CURLM_OK) { LOG_ERROR( sLogger, ("failed to call curl_multi_perform", "sleep 100ms and retry")("errMsg", curl_multi_strerror(mc))); this_thread::sleep_for(chrono::milliseconds(100)); continue; } HandleCompletedRequests(runningHandlers); unique_ptr<HttpSinkRequest> request; bool hasRequest = false; while (mQueue.TryPop(request)) { ADD_COUNTER(mInItemsTotal, 1); LOG_TRACE(sLogger, ("got item from flusher runner, item address", request->mItem)( "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( "wait time", ToString(chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now() - request->mEnqueTime) .count()))("try cnt", ToString(request->mTryCnt))); if (AddRequestToClient(std::move(request))) { ++runningHandlers; ADD_GAUGE(mSendingItemsTotal, 1); hasRequest = true; } } if (hasRequest) { continue; } struct timeval timeout { 1, 0 }; long curlTimeout = -1; if ((mc = curl_multi_timeout(mClient, &curlTimeout)) != CURLM_OK) { LOG_WARNING( sLogger, ("failed to call curl_multi_timeout", "use default timeout 1s")("errMsg", curl_multi_strerror(mc))); } if (curlTimeout >= 0) { auto sec = curlTimeout / 1000; // to avoid waiting too long so that adding new request is delayed if (sec <= 1) { timeout.tv_sec = sec; timeout.tv_usec = (curlTimeout % 1000) * 1000; } } int maxfd = -1; fd_set fdread; fd_set fdwrite; fd_set fdexcep; FD_ZERO(&fdread); FD_ZERO(&fdwrite); FD_ZERO(&fdexcep); if ((mc = curl_multi_fdset(mClient, &fdread, &fdwrite, &fdexcep, &maxfd)) != CURLM_OK) { LOG_ERROR(sLogger, ("failed to call curl_multi_fdset", "sleep 100ms")("errMsg", curl_multi_strerror(mc))); } if (maxfd == -1) { // sleep min(timeout, 100ms) according to libcurl int64_t sleepMs = (curlTimeout >= 0 && curlTimeout < 100) ? curlTimeout : 100; this_thread::sleep_for(chrono::milliseconds(sleepMs)); } else { select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout); } } } void HttpSink::HandleCompletedRequests(int& runningHandlers) { int msgsLeft = 0; CURLMsg* msg = curl_multi_info_read(mClient, &msgsLeft); while (msg) { if (msg->msg == CURLMSG_DONE) { bool requestReused = false; CURL* handler = msg->easy_handle; HttpSinkRequest* request = nullptr; curl_easy_getinfo(handler, CURLINFO_PRIVATE, &request); auto pipelinePlaceHolder = request->mItem->mPipeline; // keep pipeline alive auto responseTime = chrono::system_clock::now() - request->mLastSendTime; auto responseTimeMs = chrono::duration_cast<chrono::milliseconds>(responseTime); switch (msg->data.result) { case CURLE_OK: { long statusCode = 0; curl_easy_getinfo(handler, CURLINFO_RESPONSE_CODE, &statusCode); request->mResponse.SetNetworkStatus(NetworkCode::Ok, ""); request->mResponse.SetStatusCode(statusCode); request->mResponse.SetResponseTime(responseTimeMs); LOG_TRACE(sLogger, ("send http request succeeded, item address", request->mItem)("config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( "response time", ToString(responseTimeMs.count()) + "ms")("try cnt", ToString(request->mTryCnt))( "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); static_cast<HttpFlusher*>(request->mItem->mFlusher)->OnSendDone(request->mResponse, request->mItem); FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); ADD_COUNTER(mOutSuccessfulItemsTotal, 1); ADD_COUNTER(mSuccessfulItemTotalResponseTimeMs, responseTime); SUB_GAUGE(mSendingItemsTotal, 1); break; } default: // considered as network error if (request->mTryCnt <= request->mMaxTryCnt) { LOG_DEBUG(sLogger, ("failed to send http request", "retry immediately")("item address", request->mItem)( "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mFlusher->GetQueueKey()))( "try cnt", request->mTryCnt)("errMsg", curl_easy_strerror(msg->data.result))); // free first,becase mPrivateData will be reset in AddRequestToClient if (request->mPrivateData) { curl_slist_free_all((curl_slist*)request->mPrivateData); request->mPrivateData = nullptr; } ++request->mTryCnt; AddRequestToClient(unique_ptr<HttpSinkRequest>(request)); ++runningHandlers; ADD_GAUGE(mSendingItemsTotal, 1); requestReused = true; } else { auto errMsg = curl_easy_strerror(msg->data.result); request->mResponse.SetNetworkStatus(GetNetworkStatus(msg->data.result), errMsg); LOG_DEBUG(sLogger, ("failed to send http request", "abort")("item address", request->mItem)( "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( "response time", ToString(responseTimeMs.count()) + "ms")( "try cnt", ToString(request->mTryCnt))("errMsg", errMsg)( "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); static_cast<HttpFlusher*>(request->mItem->mFlusher) ->OnSendDone(request->mResponse, request->mItem); FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); } ADD_COUNTER(mOutFailedItemsTotal, 1); ADD_COUNTER(mFailedItemTotalResponseTimeMs, responseTime); SUB_GAUGE(mSendingItemsTotal, 1); break; } curl_multi_remove_handle(mClient, handler); curl_easy_cleanup(handler); if (!requestReused) { if (request->mPrivateData) { curl_slist_free_all((curl_slist*)request->mPrivateData); } delete request; } } msg = curl_multi_info_read(mClient, &msgsLeft); } } } // namespace logtail