in alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt [51:121]
override fun doExecute(task: Task, execMonitorRequest: ExecuteMonitorRequest, actionListener: ActionListener<ExecuteMonitorResponse>) {
val userStr = client.threadPool().threadContext.getTransient<String>(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)
log.debug("User and roles string from thread context: $userStr")
val user: User? = User.parse(userStr)
client.threadPool().threadContext.stashContext().use {
val executeMonitor = fun(monitor: Monitor) {
// Launch the coroutine with the clients threadContext. This is needed to preserve authentication information
// stored on the threadContext set by the security plugin when using the Alerting plugin with the Security plugin.
// runner.launch(ElasticThreadContextElement(client.threadPool().threadContext)) {
runner.launch {
val (periodStart, periodEnd) =
monitor.schedule.getPeriodEndingAt(Instant.ofEpochMilli(execMonitorRequest.requestEnd.millis))
try {
val monitorRunResult = if (monitor.isBucketLevelMonitor()) {
runner.runBucketLevelMonitor(monitor, periodStart, periodEnd, execMonitorRequest.dryrun)
} else {
runner.runQueryLevelMonitor(monitor, periodStart, periodEnd, execMonitorRequest.dryrun)
}
withContext(Dispatchers.IO) {
actionListener.onResponse(ExecuteMonitorResponse(monitorRunResult))
}
} catch (e: Exception) {
log.error("Unexpected error running monitor", e)
withContext(Dispatchers.IO) {
actionListener.onFailure(AlertingException.wrap(e))
}
}
}
}
if (execMonitorRequest.monitorId != null) {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).id(execMonitorRequest.monitorId)
client.get(
getRequest,
object : ActionListener<GetResponse> {
override fun onResponse(response: GetResponse) {
if (!response.isExists) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException("Can't find monitor with id: ${response.id}", RestStatus.NOT_FOUND)
)
)
return
}
if (!response.isSourceEmpty) {
XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef, XContentType.JSON
).use { xcp ->
val monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor
executeMonitor(monitor)
}
}
}
override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
}
}
)
} else {
val monitor = when (user?.name.isNullOrEmpty()) {
true -> execMonitorRequest.monitor as Monitor
false -> (execMonitorRequest.monitor as Monitor).copy(user = user)
}
executeMonitor(monitor)
}
}
}