def waitingForUpdate[E <: DomainEvent]()

in cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala [1623:1758]


  def waitingForUpdate[E <: DomainEvent](
      evt: E,
      shardId: Option[ShardId],
      waitingForStateWrite: Boolean,
      waitingForRememberShard: Boolean,
      afterUpdateCallback: E => Unit): Receive = {

    case UpdateSuccess(CoordinatorStateKey, Some(`evt`)) =>
      updateStateRetries = 0
      if (!waitingForRememberShard) {
        log.debug("{}: The coordinator state was successfully updated with {}", typeName, evt)
        if (shardId.isDefined) timers.cancel(RememberEntitiesTimeoutKey)
        unbecomeAfterUpdate(evt, afterUpdateCallback)
      } else {
        log.debug(
          "{}: The coordinator state was successfully updated with {}, waiting for remember shard update",
          typeName,
          evt)
        context.become(
          waitingForUpdate(
            evt,
            shardId,
            waitingForStateWrite = false,
            waitingForRememberShard = true,
            afterUpdateCallback = afterUpdateCallback))
      }

    case UpdateTimeout(CoordinatorStateKey, Some(`evt`)) =>
      updateStateRetries += 1

      val template =
        s"$typeName: The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout': ${stateWriteConsistency
            .timeout.toMillis} millis (${if (terminating) "terminating"
          else "retrying"}). Attempt $updateStateRetries. " +
        s"Perhaps the ShardRegion has not started on all active nodes yet? event=$evt"

      if (updateStateRetries < 5) {
        log.warning(template)
        if (terminating) {
          context.stop(self)
        } else {
          // repeat until UpdateSuccess
          sendCoordinatorStateUpdate(evt)
        }
      } else {
        log.error(template)
        if (terminating) {
          context.stop(self)
        } else {
          // repeat until UpdateSuccess
          sendCoordinatorStateUpdate(evt)
        }
      }

    case ModifyFailure(key, error, cause, _) =>
      log.error(
        cause,
        s"$typeName: The ShardCoordinator was unable to update a distributed state {} with error {} and event {}. {}",
        key,
        error,
        evt,
        if (terminating) "Coordinator will be terminated due to Terminate message received"
        else "Coordinator will be restarted")
      if (terminating) {
        context.stop(self)
      } else {
        throw cause
      }

    case g @ GetShardHome(shard) =>
      if (!handleGetShardHome(shard))
        stashGetShardHomeRequest(sender(), g) // must wait for update that is in progress

    case ShardCoordinator.Internal.Terminate =>
      log.debug("{}: The ShardCoordinator received termination message while waiting for update", typeName)
      terminating = true
      stash()

    case RememberEntitiesCoordinatorStore.UpdateDone(shard) =>
      if (!shardId.contains(shard)) {
        log.warning(
          "{}: Saw remember entities update complete for shard id [{}], while waiting for [{}]",
          typeName,
          shard,
          shardId.getOrElse(""))
      } else {
        if (!waitingForStateWrite) {
          log.debug("{}: The ShardCoordinator saw remember shard start successfully written {}", typeName, evt)
          if (shardId.isDefined) timers.cancel(RememberEntitiesTimeoutKey)
          unbecomeAfterUpdate(evt, afterUpdateCallback)
        } else {
          log.debug(
            "{}: The ShardCoordinator saw remember shard start successfully written {}, waiting for state update",
            typeName,
            evt)
          context.become(
            waitingForUpdate(
              evt,
              shardId,
              waitingForStateWrite = true,
              waitingForRememberShard = false,
              afterUpdateCallback = afterUpdateCallback))
        }
      }

    case RememberEntitiesCoordinatorStore.UpdateFailed(shard) =>
      if (shardId.contains(shard)) {
        onRememberEntitiesUpdateFailed(shard)
      } else {
        log.warning(
          "{}: Got an remember entities update failed for [{}] while waiting for [{}], ignoring",
          typeName,
          shard,
          shardId.getOrElse(""))
      }

    case RememberEntitiesTimeout(shard) =>
      if (shardId.contains(shard)) {
        onRememberEntitiesUpdateFailed(shard)
      } else {
        log.warning(
          "{}: Got an remember entities update timeout for [{}] while waiting for [{}], ignoring",
          typeName,
          shard,
          shardId.getOrElse(""))
      }

    case RememberEntitiesStoreStopped =>
      onRememberEntitiesStoreStopped()

    case _: RememberEntitiesCoordinatorStore.RememberedShards =>
      log.debug("{}: Late arrival of remembered shards while waiting for update, stashing", typeName)
      stash()

    case _ => stash()
  }