private function writeParallel()

in src/Downloader.php [332:414]


    private function writeParallel(array &$context): GuzzleHttp\Promise\Promise
    {
        return GuzzleHttp\Promise\Coroutine::of(function () use (&$context) {

            // get object meta
            $request = new Models\HeadObjectRequest();
            Utils::copyRequest($request, $context['request']);
            yield $this->client->headObjectAsync($request)->then(
                function (Models\HeadObjectResult $result) use (&$context) {
                    $pos = $context['epos'];
                    $context['epos'] = $pos < 0 ? $result->contentLength : min($pos, $result->contentLength);
                },
            );

            // download & write parallel
            $context['errors'] = [];
            $context['parts'] = [];

            $downloadFns = function () use (&$context) {
                /**
                 * @var \Psr\Http\Message\StreamInterface $stream
                 * @var int $rstart
                 */
                $stream = $context['stream'];
                $rstart = $context['pos'];
                foreach (self::iterPart($context) as $args) {
                    /**
                     * @var Models\GetObjectRequest $request
                     */
                    $request = clone $context['request'];
                    $request->rangeHeader = sprintf('bytes=%d-%d', $args[0], $args[0] + $args[1] - 1);
                    $request->rangeBehavior = 'standard';
                    $request->progressFn = null;
                    $offset = $stream->tell() + $args[0] - $rstart;
                    $sink = new GuzzleHttp\Psr7\LimitStream(
                        new GuzzleHttp\Psr7\LazyOpenStream($stream->getMetadata('uri'), 'rb+'),
                        -1,
                        $offset
                    );
                    yield $this->client->getObjectAsync(
                        $request,
                        [
                            'request_options' => [
                                'sink' => $sink
                            ]
                        ]
                    )->otherwise(
                        function ($reason) use (&$context) {
                            $context['errors'][] = $reason;
                            return GuzzleHttp\Promise\Create::rejectionFor($reason);
                        },
                    );
                    if (!empty($context['errors'])) {
                        break;
                    }
                }
            };

            $each = new GuzzleHttp\Promise\EachPromise(
                $downloadFns(),
                [
                    'concurrency' => $context['parallel_num'],
                ]
            );
            yield $each->promise();

            if (!empty($context['errors'])) {
                throw end($context['errors']);
            }
        })->then(
            function ($result) use (&$context) {
                $res = new Models\DownloadResult();
                $res->written = $context['epos'] - $context['pos'];
                return $res;
            },
            function ($reason) use (&$context) {
                return GuzzleHttp\Promise\Create::rejectionFor(new Exception\DownloadException(
                    $context['filepath'] ?? '',
                    $reason
                ));
            }
        );
    }