private def chunkAndRequestWithContext[C]()

in s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala [1217:1335]


  private def chunkAndRequestWithContext[C](
      s3Location: S3Location,
      contentType: ContentType,
      s3Headers: S3Headers,
      chunkSize: Int,
      chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), _],
      initialUploadState: Option[(String, Int)] = None)(
      parallelism: Int): Flow[(ByteString, C), UploadPartResponse, NotUsed] = {

    // This part of the API doesn't support disk-buffer because we have no way of serializing a C to a ByteString
    // so we only store the chunks in memory
    def getChunk(bufferSize: Int) =
      new MemoryWithContext[C](bufferSize)

    // Multipart upload requests (except for the completion api) are created here.
    //  The initial upload request gets executed within this function as well.
    //  The individual upload part requests are created.

    assert(
      chunkSize >= MinChunkSize,
      s"Chunk size must be at least 5 MB = $MinChunkSize bytes (was $chunkSize bytes). See http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html")

    val chunkBufferSize = chunkSize * 2

    val headers = s3Headers.serverSideEncryption.map(_.headersFor(UploadPart)).getOrElse(Nil)

    Flow
      .fromMaterializer { (mat, attr) =>
        implicit val conf: S3Settings = resolveSettings(attr, mat.system)
        implicit val sys: ActorSystem = mat.system
        implicit val materializer: Materializer = mat

        // Emits at a chunk if no chunks have been emitted. Ensures that we can upload empty files.
        val atLeastOne =
          Flow[(Chunk, immutable.Iterable[C])]
            .prefixAndTail(1)
            .flatMapConcat {
              case (prefix, tail) =>
                if (prefix.nonEmpty) {
                  Source(prefix).concat(tail)
                } else {
                  Source.single((MemoryChunk(ByteString.empty), immutable.Iterable.empty))
                }
            }

        val retriableFlow: Flow[((Chunk, (MultipartUpload, Int)), immutable.Iterable[C]),
          (
              (Try[HttpResponse],
                  (
                      MultipartUpload, Int)), immutable.Iterable[C]), NotUsed] =
          Flow[((Chunk, (MultipartUpload, Int)), immutable.Iterable[C])]
            .map {
              case ((chunkedPayload, (uploadInfo, chunkIndex)), allContext) =>
                // each of the payload requests are created
                val partRequest =
                  uploadPartRequest(uploadInfo, chunkIndex, chunkedPayload, headers)
                ((partRequest, (uploadInfo, chunkIndex)), allContext)
            }
            .flatMapConcat {
              case ((req, info), allContext) =>
                Signer.signedRequest(req, signingKey(), conf.signAnonymousRequests).zip(Source.single(info)).map {
                  case (httpRequest, data) => (httpRequest, (data, allContext))
                }
            }
            .via(superPool[((MultipartUpload, Int), immutable.Iterable[C])])
            .map {
              case (response, (info, allContext)) => ((response, info), allContext)
            }

        import conf.multipartUploadSettings.retrySettings._

        val atLeastOneByteStringAndEmptyContext: Flow[(ByteString, C), (ByteString, C), NotUsed] =
          Flow[(ByteString, C)].orElse(
            Source.single((ByteString.empty, null.asInstanceOf[C])))

        val source1: SubFlow[(Chunk, immutable.Iterable[C]), NotUsed,
          Flow[(ByteString, C), (ByteString, C),
            NotUsed]#Repr, Sink[(ByteString, C), NotUsed]] =
          SplitAfterSizeWithContext(chunkSize)(atLeastOneByteStringAndEmptyContext)
            .via(getChunk(chunkBufferSize))

        val source2: Flow[(ByteString, C), (Chunk, immutable.Iterable[C]), NotUsed] =
          source1
            .mergeSubstreamsWithParallelism(parallelism)
            .filter { case (chunk, _) => chunk.size > 0 }
            .via(atLeastOne)

        source2
          .zip(requestInfoOrUploadState(s3Location, contentType, s3Headers, initialUploadState))
          .groupBy(parallelism, { case (_, (_, chunkIndex)) => chunkIndex % parallelism })
          .map {
            case ((chunk, allContext), info) =>
              ((chunk, info), allContext)
          }
          // Allow requests that fail with transient errors to be retried, using the already buffered chunk.
          .via(RetryFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRetries, retriableFlow) {
            case ((chunkAndUploadInfo, allContext), ((Success(r), _), _)) =>
              if (isTransientError(r.status)) {
                r.entity.discardBytes()
                Some((chunkAndUploadInfo, allContext))
              } else {
                None
              }
            case ((chunkAndUploadInfo, allContext), ((Failure(_), _), _)) =>
              // Treat any exception as transient.
              Some((chunkAndUploadInfo, allContext))
          })
          .mapAsync(1) {
            case ((response, (upload, index)), allContext) =>
              handleChunkResponse(response, upload, index, conf.multipartUploadSettings.retrySettings).map { result =>
                (result, allContext)
              }(ExecutionContexts.parasitic)
          }
          .alsoTo(chunkUploadSink)
          .map { case (result, _) => result }
          .mergeSubstreamsWithParallelism(parallelism)
      }
      .mapMaterializedValue(_ => NotUsed)
  }