public function next()

in src/future/FutureIterator.php [171:259]


  public function next() {
    $this->key = null;
    if (!count($this->wait)) {
      return;
    }

    $read_sockets = array();
    $write_sockets = array();

    $start = microtime(true);
    $timeout = $this->timeout;
    $this->isTimeout = false;

    $check = $this->getWorkingSet();
    $resolve = null;
    do {
      $read_sockets    = array();
      $write_sockets   = array();
      $can_use_sockets = true;
      $wait_time       = 1;
      foreach ($check as $wait => $key) {
        $future = $this->futures[$key];
        try {
          if ($future->getException()) {
            $resolve = $wait;
            continue;
          }
          if ($future->isReady()) {
            if ($resolve === null) {
              $resolve = $wait;
            }
            continue;
          }

          $got_sockets = false;
          $socks = $future->getReadSockets();
          if ($socks) {
            $got_sockets = true;
            foreach ($socks as $socket) {
              $read_sockets[] = $socket;
            }
          }

          $socks = $future->getWriteSockets();
          if ($socks) {
            $got_sockets = true;
            foreach ($socks as $socket) {
              $write_sockets[] = $socket;
            }
          }

          // If any currently active future had neither read nor write sockets,
          // we can't wait for the current batch of items using sockets.
          if (!$got_sockets) {
            $can_use_sockets = false;
          } else {
            $wait_time = min($wait_time, $future->getDefaultWait());
          }
        } catch (Exception $ex) {
          $this->futures[$key]->setException($ex);
          $resolve = $wait;
          break;
        }
      }
      if ($resolve === null) {

        // Check for a setUpdateInterval() timeout.
        if ($timeout !== null) {
          $elapsed = microtime(true) - $start;
          if ($elapsed > $timeout) {
            $this->isTimeout = true;
            return;
          } else {
            $wait_time = $timeout - $elapsed;
          }
        }

        if ($can_use_sockets) {
          Future::waitForSockets($read_sockets, $write_sockets, $wait_time);
        } else {
          usleep(1000);
        }
      }
    } while ($resolve === null);

    $this->key = $this->wait[$resolve];
    unset($this->wait[$resolve]);
    $this->updateWorkingSet();
  }