in app/helpers/ParanoidS3Source.scala [91:134]
override def onPull(): Unit = {
val headerSequence = Seq(
HttpHeader.parse("Host", s"$bucketName.s3.${region.getName}.amazonaws.com"),
HttpHeader.parse("Date", OffsetDateTime.now().format(DateTimeFormatter.RFC_1123_DATE_TIME)),
).map({
case Ok(header, errors)=>header
case Error(err)=>throw new RuntimeException(err.toString)
})
val baseParams = "list-type=2&encoding-type=url"
//val baseParams = "delimiter=/&encoding-type=url&prefix"
val qParams = continuationToken match {
case Some(token)=>
logger.debug(s"continuation token is $token")
baseParams + s"&continuation-token=${URLEncoder.encode(token, "UTF-8")}"
case None=>
logger.debug("no continuation token")
baseParams
}
val request = HttpRequest(HttpMethods.GET,
Uri(s"https://$bucketName.s3.${region.getName}.amazonaws.com?$qParams"),
headerSequence)
val signedRequest = Await.result(signHttpRequest(request, region,"s3", credsProvider), 10 seconds)
logger.debug(s"Signed request is ${signedRequest.toString()}")
val response = Await.result(Http().singleRequest(signedRequest), 10 seconds)
//we are in paranoid mode, so can't assume that this is valid xml (yet). So, we buffer the content and manually scan for
//the continuationToken and isTruncated flags we require.
val body = Await.result(response.entity.getDataBytes().runWith(Sink.fold[ByteString,ByteString](ByteString.empty)(_.concat(_)), mat), 10 seconds)
push(out, body)
val flags:Map[String,Option[String]] = findParams(Seq("NextContinuationToken","KeyCount","IsTruncated"), body)
flags("IsTruncated") match {
case Some(flag)=>
if(flag=="true"){
continuationToken = flags("NextContinuationToken")
} else {
completeStage()
}
case None=>completeStage()
}
}