override def shape: SourceShape[LightboxEntry] = SourceShape.of()

in app/helpers/LightboxStreamComponents/LightboxDynamoSource.scala [28:129]


  override def shape: SourceShape[LightboxEntry] = SourceShape.of(out)
  private def getQueryRequestBuilder = QueryRequest.builder.tableName(config.get[String]("lightbox.tableName"))

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    protected val client = dynamoClientManager.getClient(config.getOptional[String]("externalData.awsProfile"))
    val logger = Logger(getClass)
    private var lastEvaluatedKey:Option[mutable.Map[String, AttributeValue]] = None

    private var resultCache:Seq[mutable.Map[String,AttributeValue]] = Seq()
    private var lastPage = false
    private var ctr = 0
    /**
      * retrieve next page of results from DynamoDB
      * @param limit maximum number of items to return
      * @param exclusiveStartKey key to start at. If more than `limit` items match, then the value for this is present in the previous
      *                          page's result as `Option(scanResult.getLastEvaluatedKey.asScala)`
      * @return a Try with either an AWS QueryResult object or an error
      */
    def getNextPage(limit:Int,exclusiveStartKey:Option[mutable.Map[String, AttributeValue]]) = {
      logger.info(s"getNextPage: memberOfBulk is $memberOfBulk")
      val baseRq = getQueryRequestBuilder
        .indexName("memberOfBulkIndex")
        .limit(limit)
        .keyConditionExpression(s"memberOfBulk = :bulkId")
        .expressionAttributeValues(Map(":bulkId"-> AttributeValue.builder().s(memberOfBulk).build()).asJava)

      val rqWithStart = exclusiveStartKey match {
        case Some(key)=>baseRq.exclusiveStartKey(key.asJava)
        case None=>baseRq
      }

      Try { client.query(rqWithStart.build()) }
    }

    /**
      * helper method to safely return an Option[String] for the given key if it may not be present
      * @param sourceData source data as returned by DynamoDB
      * @param key key to check
      * @return an Option that contains the value of the key, if one is present
      */
    def optionalString(sourceData:mutable.Map[String, AttributeValue], key:String):Option[String] = {
      sourceData.get(key).flatMap(attributeValue=>Option(attributeValue.s()))
    }

    /**
      * marshals the provided data from DynamoDB into a LightboxEntry
      * @param sourceData data as returned from DynamoDB
      * @return a Try that either contains the LightboxEntry or an error that occurred while marshalling
      */
    def buildLightboxEntry(sourceData: mutable.Map[String,AttributeValue]):Try[LightboxEntry] = Try {
      LightboxEntry(
        sourceData("userEmail").s(),
        sourceData("fileId").s(),
        ZonedDateTime.parse(sourceData("addedAt").s()),
        RestoreStatus.withName(sourceData("restoreStatus").s()),
        optionalString(sourceData,"restoreStarted").map(timeStr=>ZonedDateTime.parse(timeStr)),
        optionalString(sourceData,"restoreCompleted").map(timeStr=>ZonedDateTime.parse(timeStr)),
        optionalString(sourceData,"availableUntil").map(timeStr=>ZonedDateTime.parse(timeStr)),
        optionalString(sourceData,"lastError"),
        optionalString(sourceData,"memberOfBulk")
      )
    }

    setHandler(out, new AbstractOutHandler {
      override def onPull(): Unit = {
        if(resultCache.isEmpty && !lastPage){  //if we are empty, then grab the next page of results
          logger.info("cache is empty, getting next page of 100 results")
          getNextPage(pageSize, lastEvaluatedKey) match {
            case Success(scanResult)=>
              logger.info(s"Got scan result with ${scanResult.count()} items")
              lastEvaluatedKey = Option(scanResult.lastEvaluatedKey().asScala)
              resultCache = scanResult.items().asScala.map(_.asScala).toSeq

              ctr+=scanResult.count()
              logger.info(s"Scan returned ${scanResult.count()} items, running total is now $ctr items total")
              if(scanResult.count()<pageSize) {
                logger.info(s"${scanResult.count()} items is less than page size of 100, assuming that all items have been returned")
                lastPage = true
              }

            case Failure(err)=>
              logger.error(s"Could not scan Dynamodb table: ", err)
              failStage(err)
          }
        }

        if(resultCache.isEmpty ){  //if we are still empty here then no items were added.
          logger.info(s"No more results were returned")
          complete(out)
        } else {
          buildLightboxEntry(resultCache.head) match {
            case Success(entry) =>
              push(out, entry)
              resultCache = resultCache.tail
            case Failure(err) =>
              logger.error(s"Could not marshal lightbox entry data into domain object", err)
              failStage(err)
          } //match
        } //if(resultCache.isEmpty)
      } //onPull
    }) //setHandler
  } //new GraphStageLogic