in src/future/FutureIterator.php [171:259]
public function next() {
$this->key = null;
if (!count($this->wait)) {
return;
}
$read_sockets = array();
$write_sockets = array();
$start = microtime(true);
$timeout = $this->timeout;
$this->isTimeout = false;
$check = $this->getWorkingSet();
$resolve = null;
do {
$read_sockets = array();
$write_sockets = array();
$can_use_sockets = true;
$wait_time = 1;
foreach ($check as $wait => $key) {
$future = $this->futures[$key];
try {
if ($future->getException()) {
$resolve = $wait;
continue;
}
if ($future->isReady()) {
if ($resolve === null) {
$resolve = $wait;
}
continue;
}
$got_sockets = false;
$socks = $future->getReadSockets();
if ($socks) {
$got_sockets = true;
foreach ($socks as $socket) {
$read_sockets[] = $socket;
}
}
$socks = $future->getWriteSockets();
if ($socks) {
$got_sockets = true;
foreach ($socks as $socket) {
$write_sockets[] = $socket;
}
}
// If any currently active future had neither read nor write sockets,
// we can't wait for the current batch of items using sockets.
if (!$got_sockets) {
$can_use_sockets = false;
} else {
$wait_time = min($wait_time, $future->getDefaultWait());
}
} catch (Exception $ex) {
$this->futures[$key]->setException($ex);
$resolve = $wait;
break;
}
}
if ($resolve === null) {
// Check for a setUpdateInterval() timeout.
if ($timeout !== null) {
$elapsed = microtime(true) - $start;
if ($elapsed > $timeout) {
$this->isTimeout = true;
return;
} else {
$wait_time = $timeout - $elapsed;
}
}
if ($can_use_sockets) {
Future::waitForSockets($read_sockets, $write_sockets, $wait_time);
} else {
usleep(1000);
}
}
} while ($resolve === null);
$this->key = $this->wait[$resolve];
unset($this->wait[$resolve]);
$this->updateWorkingSet();
}