private def settingOffset()

in core/src/main/scala/org/apache/pekko/projection/ProjectionBehavior.scala [199:231]


  private def settingOffset(
      setOffset: SetOffset[Offset],
      mgmt: RunningProjectionManagement[Offset]): Behavior[Command] =
    Behaviors.receiveMessage {
      case Stopped =>
        context.log.debug("Projection [{}] stopped", projectionId)

        context.pipeToSelf(mgmt.setOffset(setOffset.offset)) {
          case Success(_)   => SetOffsetResult(setOffset.replyTo)
          case Failure(exc) => ManagementOperationException(setOffset, exc)
        }

        Behaviors.same

      case SetOffsetResult(replyTo) =>
        context.log.info2(
          "Starting projection [{}] after setting offset to [{}]",
          projection.projectionId,
          setOffset.offset)
        val running = projection.run()(context.system)
        replyTo ! Done
        stashBuffer.unstashAll(started(running))

      case ManagementOperationException(op, exc) =>
        context.log.warn2("Operation [{}] failed.", op, exc)
        // start anyway, but no reply
        val running = projection.run()(context.system)
        stashBuffer.unstashAll(started(running))

      case other =>
        stashBuffer.stash(other)
        Behaviors.same
    }