in app/rest/transport_curl.cc [716:921]
void CurlThread::ProcessRequests() {
// Set up multi handle.
CURLM* curl_multi = curl_multi_init();
FIREBASE_ASSERT_MESSAGE(curl_multi != nullptr,
"curl multi handle failed to initialize");
int previous_running_handles = 0;
int expected_running_handles = 0;
bool quit = false;
// This will not quit until all transfers either complete or are canceled.
while (!(quit && expected_running_handles == 0)) {
int64_t polling_interval = kPollIntervalMilliseconds;
if (quit || previous_running_handles != expected_running_handles) {
// If we're quitting or the number of transfers has changed, don't wait.
polling_interval = 0;
} else if (expected_running_handles == 0) {
// If no transfers are active wait indefinitely.
polling_interval = -1;
} else {
// Curl defines the timeout argument as a long which can be a different
// size per platform so we disable the lint warning about this.
long timeout_ms = 0; // NOLINT
CURLMcode curl_code = curl_multi_timeout(curl_multi, &timeout_ms);
if (curl_code == CURLM_OK) {
if (timeout_ms < 0) timeout_ms = kPollIntervalMilliseconds;
// Convert timeout in milliseconds to timeval.
struct timeval timeout;
timeout.tv_sec = timeout_ms / 1000;
timeout.tv_usec = (timeout_ms % 1000) * 1000;
// Wait for curl's file descriptors to signal that data is available.
fd_set fdread;
fd_set fdwrite;
fd_set fdexcep;
int maxfd = -1;
FD_ZERO(&fdread);
FD_ZERO(&fdwrite);
FD_ZERO(&fdexcep);
curl_code =
curl_multi_fdset(curl_multi, &fdread, &fdwrite, &fdexcep, &maxfd);
if (curl_code == CURLM_OK && maxfd != -1) {
polling_interval = 0;
// TODO(smiles): This should probably also be woken up when items are
// added to the queue. If a long delay occurs due to exponential
// backoff this will hang other transfers for the delay period.
select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout);
}
}
}
// Consume new transfer requests.
TransportCurlActionData action_data;
while (GetNextAction(&action_data, polling_interval)) {
polling_interval = 0;
// Act on the data.
switch (action_data.action) {
case kRequestedActionPerform: {
BackgroundTransportCurl* transport;
{
MutexLock lock(mutex_);
transport = new BackgroundTransportCurl(
curl_multi, action_data.curl, action_data.request,
action_data.response, &mutex_, action_data.controller,
action_data.transport,
[](BackgroundTransportCurl* background_transport, void* data) {
reinterpret_cast<CurlThread*>(data)->RemoveTransfer(
background_transport->response());
},
this);
}
AddTransfer(transport);
if (transport->PerformBackground(action_data.request)) {
expected_running_handles++;
} else {
delete transport;
}
break;
}
case kRequestedActionCancel: {
BackgroundTransportCurl* transport =
RemoveTransfer(action_data.response);
if (transport) {
transport->set_canceled(true);
delete transport;
expected_running_handles--;
}
break;
}
case kRequestedActionPause: {
MutexLock lock(mutex_);
auto it = transport_by_response_.find(action_data.response);
if (it != transport_by_response_.end()) {
curl_easy_pause(it->second->curl(), CURLPAUSE_ALL);
}
break;
}
case kRequestedActionResume: {
MutexLock lock(mutex_);
auto it = transport_by_response_.find(action_data.response);
if (it != transport_by_response_.end()) {
curl_easy_pause(it->second->curl(), CURLPAUSE_CONT);
}
break;
}
case kRequestedActionQuit: {
quit = true;
break;
}
}
}
// Update controllers with transfer status.
{
MutexLock lock(mutex_);
for (auto it = transport_by_response_.begin();
it != transport_by_response_.end(); ++it) {
BackgroundTransportCurl* transport = it->second;
ControllerCurl* controller = transport->controller();
if (!controller) continue;
// Update transfer status.
CURLINFO transferred_getter;
CURLINFO size_getter;
// According to the documentation, CURLINFO_CONTENT_LENGTH_(UP|DOWN)LOAD
// is deprecated in favor of CURLINFO_CONTENT_LENGTH_(UP|DOWN)LOAD_T,
// but it looks like our version of libcurl doesn't support the newer _T
// versions.
switch (controller->direction()) {
case kTransferDirectionUpload:
size_getter = CURLINFO_CONTENT_LENGTH_UPLOAD;
transferred_getter = CURLINFO_SIZE_UPLOAD;
static_assert((CURLINFO_CONTENT_LENGTH_UPLOAD &
CURLINFO_TYPEMASK) == CURLINFO_DOUBLE,
"Unexpected transfer_size type");
static_assert(
(CURLINFO_SIZE_UPLOAD & CURLINFO_TYPEMASK) == CURLINFO_DOUBLE,
"Unexpected bytes_transferred type");
break;
case kTransferDirectionDownload:
size_getter = CURLINFO_CONTENT_LENGTH_DOWNLOAD;
transferred_getter = CURLINFO_SIZE_DOWNLOAD;
static_assert((CURLINFO_CONTENT_LENGTH_DOWNLOAD &
CURLINFO_TYPEMASK) == CURLINFO_DOUBLE,
"Unexpected transfer_size type");
static_assert(
(CURLINFO_SIZE_DOWNLOAD & CURLINFO_TYPEMASK) == CURLINFO_DOUBLE,
"Unexpected bytes_transferred type");
break;
}
CURLcode code;
double value = 0.0;
code = curl_easy_getinfo(transport->curl(), size_getter, &value);
if (code == CURLE_OK) {
controller->set_transfer_size(static_cast<int64_t>(value));
}
value = 0.0;
code = curl_easy_getinfo(transport->curl(), transferred_getter, &value);
if (code == CURLE_OK) {
controller->set_bytes_transferred(static_cast<int64_t>(value));
}
}
}
int running_handles;
curl_multi_perform(curl_multi, &running_handles);
if (expected_running_handles != running_handles) {
// Some transfers completed.
int message_count;
CURLMsg* message;
while ((message = curl_multi_info_read(curl_multi, &message_count))) {
switch (message->msg) {
case CURLMSG_DONE: {
CURL* handle = message->easy_handle;
// Get the response object and clean up the easy handle.
char* char_pointer;
curl_easy_getinfo(handle, CURLINFO_PRIVATE, &char_pointer);
curl_multi_remove_handle(curl_multi, handle);
BackgroundTransportCurl* transport =
reinterpret_cast<BackgroundTransportCurl*>(char_pointer);
// Determine if the request timed out.
if (message->data.result == CURLE_OPERATION_TIMEDOUT) {
transport->set_timed_out(true);
}
// Mark the response complete.
delete transport;
expected_running_handles--;
break;
}
default: {
// Should never happen.
assert(0);
}
}
}
}
previous_running_handles = expected_running_handles;
}
// Clean up multi handle before returning.
curl_multi_cleanup(curl_multi);
}