in src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt [93:154]
override suspend fun execute(scope: CoroutineScope, initialState: PersistentTaskState?) {
try {
// The CoroutineExceptionHandler installed is mainly used to catch the exception from replication replay
// logic and persist the failure reason.
var downstreamException: Throwable? = null
val handler = CoroutineExceptionHandler { _, exception ->
logError("ShardReplicationTask: Caught downstream exception ${exception.stackTraceToString()}")
downstreamException = exception
}
// Wrap the actual replication replay logic in SupervisorCoroutine and an inner coroutine so that we have
// better control over exception propagation. Essentially any failures from inner replication logic will
// not cancel the parent coroutine and the exception is caught by the installed CoroutineExceptionHandler
//
// The only path for cancellation of this outer coroutine is external explicit cancellation (pause logic,
// task being cancelled by API etc)
//
// Checkout out the following for details
// https://kotlinlang.org/docs/exception-handling.html#supervision-scope
supervisorScope {
launch(handler) {
replicate(this)
}
}
// Non-null downstreamException implies, exception in inner replication code. In such cases we mark and
// capture the FailedState and wait for parent IndexReplicationTask to take action.
//
// Note that we don't take the action to pause/stop directly from this ShardReplicationTask since
// IndexReplicationTask can choose the appropriate action based on failures seen from multiple shards. This
// approach also avoids contention due to concurrency. Finally it keeps the scope of responsibility of
// ShardReplicationTask to ShardReplicationTask alone.
if (null != downstreamException) {
// Explicit cast is required for changing closures
val throwable: Throwable = downstreamException as Throwable
withContext(NonCancellable) {
logInfo("Going to mark ShardReplicationTask as Failed with ${throwable.stackTraceToString()}")
try {
updateTaskState(FailedState(toESException(throwable)))
} catch (inner: Exception) {
logInfo("Encountered exception while trying to persist failed state ${inner.stackTraceToString()}")
// We are not propagating failure here and will let the shard task be failed after waiting.
}
}
// After marking FailedState, IndexReplicationTask will action on it by pausing or stopping all shard
// replication tasks. This ShardReplicationTask should also thus receive the pause/stop via
// cancellation. We thus wait for waitMillis duration.
val waitMillis = Duration.ofMinutes(10).toMillis()
logInfo("Waiting $waitMillis millis for IndexReplicationTask to respond to failure of shard task")
delay(waitMillis)
// If nothing happened, we propagate exception and mark the task as failed.
throw throwable
}
} catch (e: CancellationException) {
// Nothing to do here and we don't propagate cancellation exception further
logInfo("Received cancellation of ShardReplicationTask ${e.stackTraceToString()}")
}
}