in s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala [1217:1335]
private def chunkAndRequestWithContext[C](
s3Location: S3Location,
contentType: ContentType,
s3Headers: S3Headers,
chunkSize: Int,
chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), _],
initialUploadState: Option[(String, Int)] = None)(
parallelism: Int): Flow[(ByteString, C), UploadPartResponse, NotUsed] = {
// This part of the API doesn't support disk-buffer because we have no way of serializing a C to a ByteString
// so we only store the chunks in memory
def getChunk(bufferSize: Int) =
new MemoryWithContext[C](bufferSize)
// Multipart upload requests (except for the completion api) are created here.
// The initial upload request gets executed within this function as well.
// The individual upload part requests are created.
assert(
chunkSize >= MinChunkSize,
s"Chunk size must be at least 5 MB = $MinChunkSize bytes (was $chunkSize bytes). See http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html")
val chunkBufferSize = chunkSize * 2
val headers = s3Headers.serverSideEncryption.map(_.headersFor(UploadPart)).getOrElse(Nil)
Flow
.fromMaterializer { (mat, attr) =>
implicit val conf: S3Settings = resolveSettings(attr, mat.system)
implicit val sys: ActorSystem = mat.system
implicit val materializer: Materializer = mat
// Emits at a chunk if no chunks have been emitted. Ensures that we can upload empty files.
val atLeastOne =
Flow[(Chunk, immutable.Iterable[C])]
.prefixAndTail(1)
.flatMapConcat {
case (prefix, tail) =>
if (prefix.nonEmpty) {
Source(prefix).concat(tail)
} else {
Source.single((MemoryChunk(ByteString.empty), immutable.Iterable.empty))
}
}
val retriableFlow: Flow[((Chunk, (MultipartUpload, Int)), immutable.Iterable[C]),
(
(Try[HttpResponse],
(
MultipartUpload, Int)), immutable.Iterable[C]), NotUsed] =
Flow[((Chunk, (MultipartUpload, Int)), immutable.Iterable[C])]
.map {
case ((chunkedPayload, (uploadInfo, chunkIndex)), allContext) =>
// each of the payload requests are created
val partRequest =
uploadPartRequest(uploadInfo, chunkIndex, chunkedPayload, headers)
((partRequest, (uploadInfo, chunkIndex)), allContext)
}
.flatMapConcat {
case ((req, info), allContext) =>
Signer.signedRequest(req, signingKey(), conf.signAnonymousRequests).zip(Source.single(info)).map {
case (httpRequest, data) => (httpRequest, (data, allContext))
}
}
.via(superPool[((MultipartUpload, Int), immutable.Iterable[C])])
.map {
case (response, (info, allContext)) => ((response, info), allContext)
}
import conf.multipartUploadSettings.retrySettings._
val atLeastOneByteStringAndEmptyContext: Flow[(ByteString, C), (ByteString, C), NotUsed] =
Flow[(ByteString, C)].orElse(
Source.single((ByteString.empty, null.asInstanceOf[C])))
val source1: SubFlow[(Chunk, immutable.Iterable[C]), NotUsed,
Flow[(ByteString, C), (ByteString, C),
NotUsed]#Repr, Sink[(ByteString, C), NotUsed]] =
SplitAfterSizeWithContext(chunkSize)(atLeastOneByteStringAndEmptyContext)
.via(getChunk(chunkBufferSize))
val source2: Flow[(ByteString, C), (Chunk, immutable.Iterable[C]), NotUsed] =
source1
.mergeSubstreamsWithParallelism(parallelism)
.filter { case (chunk, _) => chunk.size > 0 }
.via(atLeastOne)
source2
.zip(requestInfoOrUploadState(s3Location, contentType, s3Headers, initialUploadState))
.groupBy(parallelism, { case (_, (_, chunkIndex)) => chunkIndex % parallelism })
.map {
case ((chunk, allContext), info) =>
((chunk, info), allContext)
}
// Allow requests that fail with transient errors to be retried, using the already buffered chunk.
.via(RetryFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRetries, retriableFlow) {
case ((chunkAndUploadInfo, allContext), ((Success(r), _), _)) =>
if (isTransientError(r.status)) {
r.entity.discardBytes()
Some((chunkAndUploadInfo, allContext))
} else {
None
}
case ((chunkAndUploadInfo, allContext), ((Failure(_), _), _)) =>
// Treat any exception as transient.
Some((chunkAndUploadInfo, allContext))
})
.mapAsync(1) {
case ((response, (upload, index)), allContext) =>
handleChunkResponse(response, upload, index, conf.multipartUploadSettings.retrySettings).map { result =>
(result, allContext)
}(ExecutionContexts.parasitic)
}
.alsoTo(chunkUploadSink)
.map { case (result, _) => result }
.mergeSubstreamsWithParallelism(parallelism)
}
.mapMaterializedValue(_ => NotUsed)
}