override suspend fun execute()

in src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt [93:154]


    override suspend fun execute(scope: CoroutineScope, initialState: PersistentTaskState?) {
        try {
            // The CoroutineExceptionHandler installed is mainly used to catch the exception from replication replay
            // logic and persist the failure reason.
            var downstreamException: Throwable? = null
            val handler = CoroutineExceptionHandler { _, exception ->
                logError("ShardReplicationTask: Caught downstream exception ${exception.stackTraceToString()}")
                downstreamException = exception
            }

            // Wrap the actual replication replay logic in SupervisorCoroutine and an inner coroutine so that we have
            // better control over exception propagation. Essentially any failures from inner replication logic will
            // not cancel the parent coroutine and the exception is caught by the installed CoroutineExceptionHandler
            //
            // The only path for cancellation of this outer coroutine is external explicit cancellation (pause logic,
            // task being cancelled by API etc)
            //
            // Checkout out the following for details
            // https://kotlinlang.org/docs/exception-handling.html#supervision-scope
            supervisorScope {
                launch(handler) {
                    replicate(this)
                }
            }

            // Non-null downstreamException implies, exception in inner replication code. In such cases we mark and
            // capture the FailedState and wait for parent IndexReplicationTask to take action.
            //
            // Note that we don't take the action to pause/stop directly from this ShardReplicationTask since
            // IndexReplicationTask can choose the appropriate action based on failures seen from multiple shards. This
            // approach also avoids contention due to concurrency. Finally it keeps the scope of responsibility of
            // ShardReplicationTask to ShardReplicationTask alone.
            if (null != downstreamException) {
                // Explicit cast is required for changing closures
                val throwable: Throwable = downstreamException as Throwable

                withContext(NonCancellable) {
                    logInfo("Going to mark ShardReplicationTask as Failed with ${throwable.stackTraceToString()}")
                    try {
                        updateTaskState(FailedState(toESException(throwable)))
                    } catch (inner: Exception) {
                        logInfo("Encountered exception while trying to persist failed state ${inner.stackTraceToString()}")
                        // We are not propagating failure here and will let the shard task be failed after waiting.
                    }
                }

                // After marking FailedState, IndexReplicationTask will action on it by pausing or stopping all shard
                // replication tasks. This ShardReplicationTask should also thus receive the pause/stop via
                // cancellation. We thus wait for waitMillis duration.
                val waitMillis = Duration.ofMinutes(10).toMillis()
                logInfo("Waiting $waitMillis millis for IndexReplicationTask to respond to failure of shard task")
                delay(waitMillis)

                // If nothing happened, we propagate exception and mark the task as failed.
                throw throwable
            }

        } catch (e: CancellationException) {
            // Nothing to do here and we don't propagate cancellation exception further
            logInfo("Received cancellation of ShardReplicationTask ${e.stackTraceToString()}")
        }
    }