in src/Copier.php [394:523]
private function multipartCopy(array &$context): GuzzleHttp\Promise\Promise
{
return GuzzleHttp\Promise\Coroutine::of(function () use (&$context) {
// tag prop
if (self::needFetchTagProp($context)) {
/**
* @var Models\CopyObjectRequest $sourceRequest
*/
$sourceRequest = $context['request'];
$request = new Models\GetObjectTaggingRequest();
Utils::copyRequest($request, $sourceRequest);
if ($sourceRequest->sourceBucket != null) {
$request->bucket = $sourceRequest->sourceBucket;
}
$request->key = $sourceRequest->sourceKey;
$request->versionId = $sourceRequest->sourceVersionId;
yield $this->client->getObjectTaggingAsync($request)->then(
function (Models\GetObjectTaggingResult $result) use (&$context) {
$context['tag_prop'] = $result;
},
);
}
// init the multipart
$request = new Models\InitiateMultipartUploadRequest();
Utils::copyRequest($request, $context['request']);
$request->disableAutoDetectMimeType = true;
self::overwirteMetadataProp($request, $context);
self::overwirteTagProp($request, $context);
yield $this->client->initiateMultipartUploadAsync($request)->then(
function (Models\InitiateMultipartUploadResult $result) use (&$context) {
$context['upload_id'] = $result->uploadId;
},
);
// timeout for MultiPartCopy API, 10s per 200M, max timeout is 50s
$PART_SIZE = 200 * 1024 * 1024;
$STEP = 10;
$mpcReadTimeout = Defaults::READWRITE_TIMEOUT;
$part_size = intval($context['part_size']);
while ($part_size > $PART_SIZE) {
$mpcReadTimeout += $STEP;
$part_size -= $PART_SIZE;
if ($mpcReadTimeout > 50) {
break;
}
}
$context['mpc_read_timeout'] = $mpcReadTimeout;
// upload part
$context['errors'] = [];
$context['parts'] = [];
$uploadFns = function () use (&$context) {
foreach (self::iterPart($context) as $args) {
$request = new Models\UploadPartCopyRequest();
Utils::copyRequest($request, $context['request']);
$request->partNumber = $args[0];
$request->uploadId = $context['upload_id'];
$request->sourceRange = sprintf('bytes=%d-%d', $args[1], $args[1] + $args[2] - 1);
yield $args[0] => $this->client->uploadPartCopyAsync($request)->otherwise(
function ($reason) use (&$context) {
$context['errors'][] = $reason;
return GuzzleHttp\Promise\Create::rejectionFor($reason);
},
);
if (!empty($context['errors'])) {
break;
}
}
};
$each = new GuzzleHttp\Promise\EachPromise(
$uploadFns(),
[
'concurrency' => $context['parallel_num'],
'fulfilled' => function (Models\UploadPartCopyResult $result, $key) use (&$context) {
$context['parts'][] = new Models\UploadPart($key, $result->etag);
return $result;
}
]
);
yield $each->promise();
// complete upload
if (empty($context['errors'])) {
$parts = $context['parts'];
usort($parts, function ($a, $b) {
if ($a->partNumber == $b->partNumber) return 0;
return $a->partNumber < $b->partNumber ? -1 : 1;
});
$request = new Models\CompleteMultipartUploadRequest();
Utils::copyRequest($request, $context['request']);
$request->uploadId = $context['upload_id'];
$request->completeMultipartUpload = new Models\CompleteMultipartUpload($parts);
yield $this->client->completeMultipartUploadAsync($request)->then(
function (Models\CompleteMultipartUploadResult $result) use (&$context) {
$context['upload_result'] = $result;
return $result;
},
function ($reason) use (&$context) {
$context['errors'][] = $reason;
},
);
}
if (!empty($context['errors'])) {
if ($context['leave_parts_on_error'] === false) {
$request = new Models\AbortMultipartUploadRequest();
Utils::copyRequest($request, $context['request']);
$request->uploadId = $context['upload_id'];
yield $this->client->abortMultipartUploadAsync($request);
}
throw end($context['errors']);
}
})->then(
function ($result) use (&$context) {
$result = $context['upload_result'];
$res = new Models\CopyResult();
Utils::copyResult($res, $result);
$res->uploadId = $context['upload_id'];
$context['copy_result'] = $res;
return $res;
},
function ($reason) use (&$context) {
return GuzzleHttp\Promise\Create::rejectionFor($reason);
}
);
}