in cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala [1623:1758]
def waitingForUpdate[E <: DomainEvent](
evt: E,
shardId: Option[ShardId],
waitingForStateWrite: Boolean,
waitingForRememberShard: Boolean,
afterUpdateCallback: E => Unit): Receive = {
case UpdateSuccess(CoordinatorStateKey, Some(`evt`)) =>
updateStateRetries = 0
if (!waitingForRememberShard) {
log.debug("{}: The coordinator state was successfully updated with {}", typeName, evt)
if (shardId.isDefined) timers.cancel(RememberEntitiesTimeoutKey)
unbecomeAfterUpdate(evt, afterUpdateCallback)
} else {
log.debug(
"{}: The coordinator state was successfully updated with {}, waiting for remember shard update",
typeName,
evt)
context.become(
waitingForUpdate(
evt,
shardId,
waitingForStateWrite = false,
waitingForRememberShard = true,
afterUpdateCallback = afterUpdateCallback))
}
case UpdateTimeout(CoordinatorStateKey, Some(`evt`)) =>
updateStateRetries += 1
val template =
s"$typeName: The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout': ${stateWriteConsistency
.timeout.toMillis} millis (${if (terminating) "terminating"
else "retrying"}). Attempt $updateStateRetries. " +
s"Perhaps the ShardRegion has not started on all active nodes yet? event=$evt"
if (updateStateRetries < 5) {
log.warning(template)
if (terminating) {
context.stop(self)
} else {
// repeat until UpdateSuccess
sendCoordinatorStateUpdate(evt)
}
} else {
log.error(template)
if (terminating) {
context.stop(self)
} else {
// repeat until UpdateSuccess
sendCoordinatorStateUpdate(evt)
}
}
case ModifyFailure(key, error, cause, _) =>
log.error(
cause,
s"$typeName: The ShardCoordinator was unable to update a distributed state {} with error {} and event {}. {}",
key,
error,
evt,
if (terminating) "Coordinator will be terminated due to Terminate message received"
else "Coordinator will be restarted")
if (terminating) {
context.stop(self)
} else {
throw cause
}
case g @ GetShardHome(shard) =>
if (!handleGetShardHome(shard))
stashGetShardHomeRequest(sender(), g) // must wait for update that is in progress
case ShardCoordinator.Internal.Terminate =>
log.debug("{}: The ShardCoordinator received termination message while waiting for update", typeName)
terminating = true
stash()
case RememberEntitiesCoordinatorStore.UpdateDone(shard) =>
if (!shardId.contains(shard)) {
log.warning(
"{}: Saw remember entities update complete for shard id [{}], while waiting for [{}]",
typeName,
shard,
shardId.getOrElse(""))
} else {
if (!waitingForStateWrite) {
log.debug("{}: The ShardCoordinator saw remember shard start successfully written {}", typeName, evt)
if (shardId.isDefined) timers.cancel(RememberEntitiesTimeoutKey)
unbecomeAfterUpdate(evt, afterUpdateCallback)
} else {
log.debug(
"{}: The ShardCoordinator saw remember shard start successfully written {}, waiting for state update",
typeName,
evt)
context.become(
waitingForUpdate(
evt,
shardId,
waitingForStateWrite = true,
waitingForRememberShard = false,
afterUpdateCallback = afterUpdateCallback))
}
}
case RememberEntitiesCoordinatorStore.UpdateFailed(shard) =>
if (shardId.contains(shard)) {
onRememberEntitiesUpdateFailed(shard)
} else {
log.warning(
"{}: Got an remember entities update failed for [{}] while waiting for [{}], ignoring",
typeName,
shard,
shardId.getOrElse(""))
}
case RememberEntitiesTimeout(shard) =>
if (shardId.contains(shard)) {
onRememberEntitiesUpdateFailed(shard)
} else {
log.warning(
"{}: Got an remember entities update timeout for [{}] while waiting for [{}], ignoring",
typeName,
shard,
shardId.getOrElse(""))
}
case RememberEntitiesStoreStopped =>
onRememberEntitiesStoreStopped()
case _: RememberEntitiesCoordinatorStore.RememberedShards =>
log.debug("{}: Late arrival of remembered shards while waiting for update, stashing", typeName)
stash()
case _ => stash()
}