private function fanOutAndAggregateResponses()

in web_ui/src/applications/bistro/multicurl/clients/BistroBaseMultiCurlClient.php [65:138]


  private function fanOutAndAggregateResponses(
      BistroCommonPrefs $prefs, array $fqdn_hostports, /* string */ $query) {

    $request = $prefs->getRequest();
    $request_hostname = $request->getHost();
    $username = $request->getUser()->getUserName();

    // Prevent replays of responses
    $secret = base64_encode(bistro_read_random_bytes(
      3 * BistroBaseMultiCurlController::SECRET_BYTES / 4
    ));

    $multi_workload = new CurlMultiWorkload();
    $workloads = array();

    // Best not to be too deterministic with the query grouping
    shuffle($fqdn_hostports);
    // Instead of splitting 6 hostports 5:1, split them 3:3, etc.
    $chunk_size = intval(0.5 + count($fqdn_hostports) / min(
      intval(ceil(count($fqdn_hostports)
        / $prefs->get(BistroCommonPrefs::PREF_HOSTPORTS_PER_CURL))),
      $prefs->get(BistroCommonPrefs::PREF_MAX_CURL_COUNT)
    ));
    $hostport_chunks = array_chunk($fqdn_hostports, $chunk_size);
    foreach ($hostport_chunks as $i => $hps) {
      $workloads[$i] = new CurlWorkload(
        $request_hostname,
        get_class($this).'::CURL_ENDPOINT_PREFIX',
        array(
          $username,
          BistroCurlProtection::protectRequest(
            $username,
            json_encode(array(
              BistroBaseMultiCurlController::REQUEST_HOSTPORTS => $hps,
              BistroBaseMultiCurlController::REQUEST_SECRET => $secret,
              BistroBaseMultiCurlController::REQUEST_QUERY => $query,
              BistroBaseMultiCurlController::REQUEST_SEND_TIMEOUT =>
                $prefs->get(BistroCommonPrefs::PREF_SEND_TIMEOUT_MS),
              BistroBaseMultiCurlController::REQUEST_RECV_TIMEOUT =>
                $prefs->get(BistroCommonPrefs::PREF_RECV_TIMEOUT_MS)))
          ))
      );
      $multi_workload->addWorkload($workloads[$i]);
    }
    $multi_workload->exec();

    $hostport_responses = array();
    foreach ($hostport_chunks as $i => $hps) {
      $error = $workloads[$i]->getError();
      if ($error !== null) {
        // Not an exception, because even when some sub-curls time out,
        // we can still expect to show some data.
        bistro_monitor_log()->error(
          'While fetching hostports: '.implode(', ', $hps).', got '.$error);
        continue;
      }
      foreach ($this->extractResults(
        $workloads[$i]->getResult(),
        $username,
        $secret,
        $hps
      ) as $hp => $serialized_response) {
        // Uncomment to debug on-the-wire data size
        // error_log($hp.' '.strlen($serialized_response));
        try {
          $hostport_responses[$hp] = $this->unserializeResponse(
            $hp, base64_decode($serialized_response));
        } catch (BistroMultiCurlQueryError $err) {
          $this->logError($hp, $err);
        }
      }
    }
    return $hostport_responses;
  }