override def onPull()

in app/helpers/ParanoidS3Source.scala [91:134]


      override def onPull(): Unit = {
        val headerSequence = Seq(
          HttpHeader.parse("Host", s"$bucketName.s3.${region.getName}.amazonaws.com"),
          HttpHeader.parse("Date", OffsetDateTime.now().format(DateTimeFormatter.RFC_1123_DATE_TIME)),
        ).map({
          case Ok(header, errors)=>header
          case Error(err)=>throw new RuntimeException(err.toString)
        })

        val baseParams = "list-type=2&encoding-type=url"
        //val baseParams = "delimiter=/&encoding-type=url&prefix"
        val qParams = continuationToken match {
          case Some(token)=>
            logger.debug(s"continuation token is $token")
            baseParams + s"&continuation-token=${URLEncoder.encode(token, "UTF-8")}"
          case None=>
            logger.debug("no continuation token")
            baseParams
        }

        val request = HttpRequest(HttpMethods.GET,
          Uri(s"https://$bucketName.s3.${region.getName}.amazonaws.com?$qParams"),
          headerSequence)

        val signedRequest = Await.result(signHttpRequest(request, region,"s3", credsProvider), 10 seconds)
        logger.debug(s"Signed request is ${signedRequest.toString()}")
        val response = Await.result(Http().singleRequest(signedRequest), 10 seconds)

        //we are in paranoid mode, so can't assume that this is valid xml (yet). So, we buffer the content and manually scan for
        //the continuationToken and isTruncated flags we require.
        val body = Await.result(response.entity.getDataBytes().runWith(Sink.fold[ByteString,ByteString](ByteString.empty)(_.concat(_)), mat), 10 seconds)
        push(out, body)
        val flags:Map[String,Option[String]] = findParams(Seq("NextContinuationToken","KeyCount","IsTruncated"), body)

        flags("IsTruncated") match {
          case Some(flag)=>
            if(flag=="true"){
              continuationToken = flags("NextContinuationToken")
            } else {
              completeStage()
            }
          case None=>completeStage()
        }
      }