private def started()

in core/src/main/scala/org/apache/pekko/projection/ProjectionBehavior.scala [109:189]


  private def started(running: RunningProjection): Behavior[Command] =
    Behaviors
      .receiveMessagePartial[Command] {
        case Stop =>
          context.log.debug("Projection [{}] is being stopped", projectionId)
          val stoppedFut = running.stop()
          // we send a Stopped for whatever completes the Future
          // Success or Failure, doesn't matter, since the internal stream is by then stopped
          context.pipeToSelf(stoppedFut)(_ => Stopped)
          stopping()

        case getOffset: GetOffset[Offset @unchecked] =>
          running match {
            case mgmt: RunningProjectionManagement[Offset @unchecked] =>
              if (getOffset.projectionId == projectionId) {
                context.pipeToSelf(mgmt.getOffset()) {
                  case Success(offset) => GetOffsetResult(offset, getOffset.replyTo)
                  case Failure(exc)    => ManagementOperationException(getOffset, exc)
                }
              }
              Behaviors.same
            case _ => Behaviors.unhandled
          }

        case result: GetOffsetResult[Offset @unchecked] =>
          receiveGetOffsetResult(result)

        case setOffset: SetOffset[Offset @unchecked] =>
          running match {
            case mgmt: RunningProjectionManagement[Offset @unchecked] =>
              if (setOffset.projectionId == projectionId) {
                context.log.info2(
                  "Offset will be changed to [{}] for projection [{}]. The Projection will be restarted.",
                  setOffset.offset,
                  projectionId)
                context.pipeToSelf(running.stop())(_ => Stopped)
                settingOffset(setOffset, mgmt)
              } else {
                Behaviors.same // not for this projectionId
              }
            case _ => Behaviors.unhandled
          }

        case ManagementOperationException(op, exc) =>
          context.log.warn2("Operation [{}] failed with: {}", op, exc)
          Behaviors.same

        case isPaused: IsPaused =>
          running match {
            case mgmt: RunningProjectionManagement[_] =>
              if (isPaused.projectionId == projectionId) {
                context.pipeToSelf(mgmt.getManagementState()) {
                  case Success(state) => GetManagementStateResult(state, isPaused.replyTo)
                  case Failure(exc)   => ManagementOperationException(isPaused, exc)
                }
              }
              Behaviors.same
            case _ => Behaviors.unhandled
          }

        case GetManagementStateResult(state, replyTo) =>
          replyTo ! state.exists(_.paused)
          Behaviors.same

        case setPaused: SetPaused =>
          running match {
            case mgmt: RunningProjectionManagement[_] =>
              if (setPaused.projectionId == projectionId) {
                context.log.info2(
                  "Running state will be changed to [{}] for projection [{}].",
                  if (setPaused.paused) "paused" else "resumed",
                  projectionId)
                context.pipeToSelf(running.stop())(_ => Stopped)
                settingPaused(setPaused, mgmt)
              } else {
                Behaviors.same // not for this projectionId
              }
            case _ => Behaviors.unhandled
          }

      }