in src/Uploader.php [275:383]
private function multiPart(array &$context): GuzzleHttp\Promise\Promise
{
return GuzzleHttp\Promise\Coroutine::of(function () use (&$context) {
// init the multipart
$request = new Models\InitiateMultipartUploadRequest();
Utils::copyRequest($request, $context['request']);
if ($this->isEncryptionClient) {
$request->csePartSize = $context['part_size'];
$request->cseDataSize = $context['total_size'];
}
if (empty($imRequest->contentType)) {
$request->contentType = $this->get_content_type($context);
}
yield $this->client->initiateMultipartUploadAsync($request)->then(
function (Models\InitiateMultipartUploadResult $result) use (&$context) {
$context['upload_id'] = $result->uploadId;
if ($this->isEncryptionClient) {
$context['encryption_multi_part_context'] = $result->encryptionMultipartContext;
}
},
);
// upload part
$context['errors'] = [];
$context['parts'] = [];
$uploadFns = function () use (&$context) {
foreach (self::iterPart($context) as $args) {
$request = new Models\UploadPartRequest();
Utils::copyRequest($request, $context['request']);
$request->partNumber = $args[0];
$request->uploadId = $context['upload_id'];
$request->body = $args[1];
if ($this->isEncryptionClient) {
$request->encryptionMultipartContext = $context['encryption_multi_part_context'];
}
yield $args[0] => $this->client->uploadPartAsync($request)->otherwise(
function ($reason) use (&$context) {
$context['errors'][] = $reason;
return GuzzleHttp\Promise\Create::rejectionFor($reason);
},
);
if (!empty($context['errors'])) {
break;
}
}
};
$each = new GuzzleHttp\Promise\EachPromise(
$uploadFns(),
[
'concurrency' => $context['parallel_num'],
'fulfilled' => function (Models\UploadPartResult $result, $key) use (&$context) {
$context['parts'][] = new Models\UploadPart($key, $result->etag);
return $result;
}
]
);
yield $each->promise();
// complete upload
if (empty($context['errors'])) {
$parts = $context['parts'];
usort($parts, function ($a, $b) {
if ($a->partNumber == $b->partNumber) return 0;
return $a->partNumber < $b->partNumber ? -1 : 1;
});
$request = new Models\CompleteMultipartUploadRequest();
Utils::copyRequest($request, $context['request']);
$request->uploadId = $context['upload_id'];
$request->completeMultipartUpload = new Models\CompleteMultipartUpload($parts);
yield $this->client->completeMultipartUploadAsync($request)->then(
function (Models\CompleteMultipartUploadResult $result) use (&$context) {
$context['upload_result'] = $result;
return $result;
},
function ($reason) use (&$context) {
$context['errors'][] = $reason;
},
);
}
if (!empty($context['errors'])) {
if ($context['leave_parts_on_error'] === false) {
$request = new Models\AbortMultipartUploadRequest();
Utils::copyRequest($request, $context['request']);
$request->uploadId = $context['upload_id'];
yield $this->client->abortMultipartUploadAsync($request);
}
throw end($context['errors']);
}
})->then(
function ($result) use (&$context) {
$result = $context['upload_result'];
$res = new Models\UploadResult();
Utils::copyResult($res, $result);
$res->uploadId = $context['upload_id'];
return $res;
},
function ($reason) use (&$context) {
return GuzzleHttp\Promise\Create::rejectionFor(new Exception\UploadException(
$context['upload_id'] ?? '',
$context['filepath'] ?? '',
$reason
));
}
);
}