in src/Downloader.php [332:414]
private function writeParallel(array &$context): GuzzleHttp\Promise\Promise
{
return GuzzleHttp\Promise\Coroutine::of(function () use (&$context) {
// get object meta
$request = new Models\HeadObjectRequest();
Utils::copyRequest($request, $context['request']);
yield $this->client->headObjectAsync($request)->then(
function (Models\HeadObjectResult $result) use (&$context) {
$pos = $context['epos'];
$context['epos'] = $pos < 0 ? $result->contentLength : min($pos, $result->contentLength);
},
);
// download & write parallel
$context['errors'] = [];
$context['parts'] = [];
$downloadFns = function () use (&$context) {
/**
* @var \Psr\Http\Message\StreamInterface $stream
* @var int $rstart
*/
$stream = $context['stream'];
$rstart = $context['pos'];
foreach (self::iterPart($context) as $args) {
/**
* @var Models\GetObjectRequest $request
*/
$request = clone $context['request'];
$request->rangeHeader = sprintf('bytes=%d-%d', $args[0], $args[0] + $args[1] - 1);
$request->rangeBehavior = 'standard';
$request->progressFn = null;
$offset = $stream->tell() + $args[0] - $rstart;
$sink = new GuzzleHttp\Psr7\LimitStream(
new GuzzleHttp\Psr7\LazyOpenStream($stream->getMetadata('uri'), 'rb+'),
-1,
$offset
);
yield $this->client->getObjectAsync(
$request,
[
'request_options' => [
'sink' => $sink
]
]
)->otherwise(
function ($reason) use (&$context) {
$context['errors'][] = $reason;
return GuzzleHttp\Promise\Create::rejectionFor($reason);
},
);
if (!empty($context['errors'])) {
break;
}
}
};
$each = new GuzzleHttp\Promise\EachPromise(
$downloadFns(),
[
'concurrency' => $context['parallel_num'],
]
);
yield $each->promise();
if (!empty($context['errors'])) {
throw end($context['errors']);
}
})->then(
function ($result) use (&$context) {
$res = new Models\DownloadResult();
$res->written = $context['epos'] - $context['pos'];
return $res;
},
function ($reason) use (&$context) {
return GuzzleHttp\Promise\Create::rejectionFor(new Exception\DownloadException(
$context['filepath'] ?? '',
$reason
));
}
);
}