override def createLogic()

in kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSourceStage.scala [65:183]


  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new TimerGraphStageLogic(shape) with StageLogging with OutHandler {

      setHandler(out, this)

      import shardSettings._

      private[this] var currentShardIterator: String = _
      private[this] val buffer = mutable.Queue.empty[Record]
      private[this] var self: StageActor = _

      override def preStart(): Unit = {
        self = getStageActor(awaitingShardIterator)
        requestShardIterator()
      }

      override def onPull(): Unit = self.ref ! Pump

      private def awaitingShardIterator(in: (ActorRef, Any)): Unit = in match {
        case (_, GetShardIteratorSuccess(result)) =>
          currentShardIterator = result.shardIterator
          self.become(awaitingRecords)
          requestRecords()

        case (_, GetShardIteratorFailure(ex)) =>
          val error = new Errors.GetShardIteratorError(shardId, ex)
          log.error(ex, error.getMessage)
          failStage(error)

        case (_, Pump) =>
        case (_, msg) =>
          throw new IllegalArgumentException(s"unexpected message $msg in state `ready`")
      }

      private def awaitingRecords(in: (ActorRef, Any)): Unit = in match {
        case (_, GetRecordsSuccess(result)) =>
          val records = result.records.asScala
          if (result.nextShardIterator == null) {
            log.info("Shard {} returned a null iterator and will now complete.", shardId)
            completeStage()
          } else {
            currentShardIterator = result.nextShardIterator
          }
          if (records.nonEmpty) {
            records.foreach(buffer.enqueue(_))
            self.become(ready)
            self.ref ! Pump
          } else {
            scheduleOnce(GetRecords, refreshInterval)
          }

        case (_, GetRecordsFailure(ex)) =>
          val error = new Errors.GetRecordsError(shardId, ex)
          log.error(ex, error.getMessage)
          failStage(error)

        case (_, Pump) =>
        case (_, msg) =>
          throw new IllegalArgumentException(s"unexpected message $msg in state `ready`")
      }

      private def ready(in: (ActorRef, Any)): Unit = in match {
        case (_, Pump) =>
          if (isAvailable(shape.out)) {
            push(shape.out, buffer.dequeue())
            self.ref ! Pump
          }
          if (buffer.isEmpty) {
            self.become(awaitingRecords)
            requestRecords()
          }

        case (_, msg) =>
          throw new IllegalArgumentException(s"unexpected message $msg in state `ready`")
      }

      override protected def onTimer(timerKey: Any): Unit = timerKey match {
        case GetRecords => requestRecords()
        case other =>
          log.warning("unexpected timer [{}]", other)
      }

      private[this] val handleGetRecords: Try[GetRecordsResponse] => Unit = {
        case Failure(exception) => self.ref ! GetRecordsFailure(exception)
        case Success(result)    => self.ref ! GetRecordsSuccess(result)
      }

      private[this] def requestRecords(): Unit =
        amazonKinesisAsync
          .getRecords(
            GetRecordsRequest.builder().limit(limit).shardIterator(currentShardIterator).build())
          .asScala
          .onComplete(handleGetRecords)(parasitic)

      private[this] def requestShardIterator(): Unit = {
        val request = Function
          .chain[GetShardIteratorRequest.Builder](
            Seq(
              r => startingSequenceNumber.fold(r)(r.startingSequenceNumber),
              r => atTimestamp.fold(r)(instant => r.timestamp(instant))))(
            GetShardIteratorRequest
              .builder()
              .streamName(streamName)
              .shardId(shardId)
              .shardIteratorType(shardIteratorType))
          .build()

        val handleGetShardIterator: Try[GetShardIteratorResponse] => Unit = {
          case Success(result)    => self.ref ! GetShardIteratorSuccess(result)
          case Failure(exception) => self.ref ! GetShardIteratorFailure(exception)
        }

        amazonKinesisAsync
          .getShardIterator(request)
          .asScala
          .onComplete(handleGetShardIterator)(parasitic)
      }

    }