void CurlThread::ProcessRequests()

in app/rest/transport_curl.cc [716:921]


void CurlThread::ProcessRequests() {
  // Set up multi handle.
  CURLM* curl_multi = curl_multi_init();
  FIREBASE_ASSERT_MESSAGE(curl_multi != nullptr,
                          "curl multi handle failed to initialize");

  int previous_running_handles = 0;
  int expected_running_handles = 0;
  bool quit = false;
  // This will not quit until all transfers either complete or are canceled.
  while (!(quit && expected_running_handles == 0)) {
    int64_t polling_interval = kPollIntervalMilliseconds;
    if (quit || previous_running_handles != expected_running_handles) {
      // If we're quitting or the number of transfers has changed, don't wait.
      polling_interval = 0;
    } else if (expected_running_handles == 0) {
      // If no transfers are active wait indefinitely.
      polling_interval = -1;
    } else {
      // Curl defines the timeout argument as a long which can be a different
      // size per platform so we disable the lint warning about this.
      long timeout_ms = 0;  // NOLINT
      CURLMcode curl_code = curl_multi_timeout(curl_multi, &timeout_ms);
      if (curl_code == CURLM_OK) {
        if (timeout_ms < 0) timeout_ms = kPollIntervalMilliseconds;

        // Convert timeout in milliseconds to timeval.
        struct timeval timeout;
        timeout.tv_sec = timeout_ms / 1000;
        timeout.tv_usec = (timeout_ms % 1000) * 1000;

        // Wait for curl's file descriptors to signal that data is available.
        fd_set fdread;
        fd_set fdwrite;
        fd_set fdexcep;
        int maxfd = -1;
        FD_ZERO(&fdread);
        FD_ZERO(&fdwrite);
        FD_ZERO(&fdexcep);
        curl_code =
            curl_multi_fdset(curl_multi, &fdread, &fdwrite, &fdexcep, &maxfd);
        if (curl_code == CURLM_OK && maxfd != -1) {
          polling_interval = 0;
          // TODO(smiles): This should probably also be woken up when items are
          // added to the queue.  If a long delay occurs due to exponential
          // backoff this will hang other transfers for the delay period.
          select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout);
        }
      }
    }

    // Consume new transfer requests.
    TransportCurlActionData action_data;
    while (GetNextAction(&action_data, polling_interval)) {
      polling_interval = 0;
      // Act on the data.
      switch (action_data.action) {
        case kRequestedActionPerform: {
          BackgroundTransportCurl* transport;
          {
            MutexLock lock(mutex_);
            transport = new BackgroundTransportCurl(
                curl_multi, action_data.curl, action_data.request,
                action_data.response, &mutex_, action_data.controller,
                action_data.transport,
                [](BackgroundTransportCurl* background_transport, void* data) {
                  reinterpret_cast<CurlThread*>(data)->RemoveTransfer(
                      background_transport->response());
                },
                this);
          }
          AddTransfer(transport);
          if (transport->PerformBackground(action_data.request)) {
            expected_running_handles++;
          } else {
            delete transport;
          }
          break;
        }
        case kRequestedActionCancel: {
          BackgroundTransportCurl* transport =
              RemoveTransfer(action_data.response);
          if (transport) {
            transport->set_canceled(true);
            delete transport;
            expected_running_handles--;
          }
          break;
        }
        case kRequestedActionPause: {
          MutexLock lock(mutex_);
          auto it = transport_by_response_.find(action_data.response);
          if (it != transport_by_response_.end()) {
            curl_easy_pause(it->second->curl(), CURLPAUSE_ALL);
          }
          break;
        }
        case kRequestedActionResume: {
          MutexLock lock(mutex_);
          auto it = transport_by_response_.find(action_data.response);
          if (it != transport_by_response_.end()) {
            curl_easy_pause(it->second->curl(), CURLPAUSE_CONT);
          }
          break;
        }
        case kRequestedActionQuit: {
          quit = true;
          break;
        }
      }
    }

    // Update controllers with transfer status.
    {
      MutexLock lock(mutex_);
      for (auto it = transport_by_response_.begin();
           it != transport_by_response_.end(); ++it) {
        BackgroundTransportCurl* transport = it->second;
        ControllerCurl* controller = transport->controller();
        if (!controller) continue;
        // Update transfer status.
        CURLINFO transferred_getter;
        CURLINFO size_getter;
        // According to the documentation, CURLINFO_CONTENT_LENGTH_(UP|DOWN)LOAD
        // is deprecated in favor of CURLINFO_CONTENT_LENGTH_(UP|DOWN)LOAD_T,
        // but it looks like our version of libcurl doesn't support the newer _T
        // versions.
        switch (controller->direction()) {
          case kTransferDirectionUpload:
            size_getter = CURLINFO_CONTENT_LENGTH_UPLOAD;
            transferred_getter = CURLINFO_SIZE_UPLOAD;
            static_assert((CURLINFO_CONTENT_LENGTH_UPLOAD &
                           CURLINFO_TYPEMASK) == CURLINFO_DOUBLE,
                          "Unexpected transfer_size type");
            static_assert(
                (CURLINFO_SIZE_UPLOAD & CURLINFO_TYPEMASK) == CURLINFO_DOUBLE,
                "Unexpected bytes_transferred type");
            break;
          case kTransferDirectionDownload:
            size_getter = CURLINFO_CONTENT_LENGTH_DOWNLOAD;
            transferred_getter = CURLINFO_SIZE_DOWNLOAD;
            static_assert((CURLINFO_CONTENT_LENGTH_DOWNLOAD &
                           CURLINFO_TYPEMASK) == CURLINFO_DOUBLE,
                          "Unexpected transfer_size type");
            static_assert(
                (CURLINFO_SIZE_DOWNLOAD & CURLINFO_TYPEMASK) == CURLINFO_DOUBLE,
                "Unexpected bytes_transferred type");
            break;
        }

        CURLcode code;
        double value = 0.0;
        code = curl_easy_getinfo(transport->curl(), size_getter, &value);
        if (code == CURLE_OK) {
          controller->set_transfer_size(static_cast<int64_t>(value));
        }
        value = 0.0;
        code = curl_easy_getinfo(transport->curl(), transferred_getter, &value);
        if (code == CURLE_OK) {
          controller->set_bytes_transferred(static_cast<int64_t>(value));
        }
      }
    }

    int running_handles;
    curl_multi_perform(curl_multi, &running_handles);

    if (expected_running_handles != running_handles) {
      // Some transfers completed.
      int message_count;
      CURLMsg* message;
      while ((message = curl_multi_info_read(curl_multi, &message_count))) {
        switch (message->msg) {
          case CURLMSG_DONE: {
            CURL* handle = message->easy_handle;

            // Get the response object and clean up the easy handle.
            char* char_pointer;
            curl_easy_getinfo(handle, CURLINFO_PRIVATE, &char_pointer);
            curl_multi_remove_handle(curl_multi, handle);
            BackgroundTransportCurl* transport =
                reinterpret_cast<BackgroundTransportCurl*>(char_pointer);

            // Determine if the request timed out.
            if (message->data.result == CURLE_OPERATION_TIMEDOUT) {
              transport->set_timed_out(true);
            }

            // Mark the response complete.
            delete transport;
            expected_running_handles--;
            break;
          }
          default: {
            // Should never happen.
            assert(0);
          }
        }
      }
    }
    previous_running_handles = expected_running_handles;
  }

  // Clean up multi handle before returning.
  curl_multi_cleanup(curl_multi);
}