private function multipartCopy()

in src/Copier.php [394:523]


    private function multipartCopy(array &$context): GuzzleHttp\Promise\Promise
    {
        return GuzzleHttp\Promise\Coroutine::of(function () use (&$context) {

            // tag prop
            if (self::needFetchTagProp($context)) {
                /**
                 * @var Models\CopyObjectRequest $sourceRequest
                 */
                $sourceRequest = $context['request'];
                $request = new Models\GetObjectTaggingRequest();
                Utils::copyRequest($request, $sourceRequest);
                if ($sourceRequest->sourceBucket != null) {
                    $request->bucket = $sourceRequest->sourceBucket;
                }
                $request->key = $sourceRequest->sourceKey;
                $request->versionId = $sourceRequest->sourceVersionId;
                yield $this->client->getObjectTaggingAsync($request)->then(
                    function (Models\GetObjectTaggingResult $result) use (&$context) {
                        $context['tag_prop'] = $result;
                    },
                );
            }

            // init the multipart
            $request = new Models\InitiateMultipartUploadRequest();
            Utils::copyRequest($request, $context['request']);
            $request->disableAutoDetectMimeType = true;
            self::overwirteMetadataProp($request, $context);
            self::overwirteTagProp($request, $context);
            yield $this->client->initiateMultipartUploadAsync($request)->then(
                function (Models\InitiateMultipartUploadResult $result) use (&$context) {
                    $context['upload_id'] = $result->uploadId;
                },
            );

            // timeout for MultiPartCopy API, 10s per 200M, max timeout is 50s
            $PART_SIZE = 200 * 1024 * 1024;
            $STEP = 10;
            $mpcReadTimeout = Defaults::READWRITE_TIMEOUT;
            $part_size = intval($context['part_size']);
            while ($part_size > $PART_SIZE) {
                $mpcReadTimeout += $STEP;
                $part_size -= $PART_SIZE;
                if ($mpcReadTimeout > 50) {
                    break;
                }
            }
            $context['mpc_read_timeout'] = $mpcReadTimeout;

            // upload part
            $context['errors'] = [];
            $context['parts'] = [];
            $uploadFns = function () use (&$context) {
                foreach (self::iterPart($context) as $args) {
                    $request = new Models\UploadPartCopyRequest();
                    Utils::copyRequest($request, $context['request']);
                    $request->partNumber = $args[0];
                    $request->uploadId = $context['upload_id'];
                    $request->sourceRange = sprintf('bytes=%d-%d', $args[1], $args[1] + $args[2] - 1);
                    yield $args[0] => $this->client->uploadPartCopyAsync($request)->otherwise(
                        function ($reason) use (&$context) {
                            $context['errors'][] = $reason;
                            return GuzzleHttp\Promise\Create::rejectionFor($reason);
                        },
                    );
                    if (!empty($context['errors'])) {
                        break;
                    }
                }
            };

            $each = new GuzzleHttp\Promise\EachPromise(
                $uploadFns(),
                [
                    'concurrency' => $context['parallel_num'],
                    'fulfilled' => function (Models\UploadPartCopyResult $result, $key) use (&$context) {
                        $context['parts'][] = new Models\UploadPart($key, $result->etag);
                        return $result;
                    }
                ]
            );
            yield $each->promise();

            // complete upload
            if (empty($context['errors'])) {
                $parts = $context['parts'];
                usort($parts, function ($a, $b) {
                    if ($a->partNumber == $b->partNumber) return 0;
                    return $a->partNumber < $b->partNumber ? -1 : 1;
                });

                $request = new Models\CompleteMultipartUploadRequest();
                Utils::copyRequest($request, $context['request']);
                $request->uploadId = $context['upload_id'];
                $request->completeMultipartUpload = new Models\CompleteMultipartUpload($parts);
                yield $this->client->completeMultipartUploadAsync($request)->then(
                    function (Models\CompleteMultipartUploadResult $result) use (&$context) {
                        $context['upload_result'] = $result;
                        return $result;
                    },
                    function ($reason) use (&$context) {
                        $context['errors'][] = $reason;
                    },
                );
            }

            if (!empty($context['errors'])) {
                if ($context['leave_parts_on_error'] === false) {
                    $request = new Models\AbortMultipartUploadRequest();
                    Utils::copyRequest($request, $context['request']);
                    $request->uploadId = $context['upload_id'];
                    yield $this->client->abortMultipartUploadAsync($request);
                }
                throw end($context['errors']);
            }
        })->then(
            function ($result) use (&$context) {
                $result = $context['upload_result'];
                $res = new Models\CopyResult();
                Utils::copyResult($res, $result);
                $res->uploadId = $context['upload_id'];
                $context['copy_result'] = $res;
                return $res;
            },
            function ($reason) use (&$context) {
                return GuzzleHttp\Promise\Create::rejectionFor($reason);
            }
        );
    }