in indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java [1511:1707]
public void taskAddedOrUpdated(final TaskAnnouncement announcement, final WorkerHolder workerHolder)
{
final String taskId = announcement.getTaskId();
final Worker worker = workerHolder.getWorker();
log.debug(
"Worker[%s] wrote [%s] status for task [%s] on [%s]",
worker.getHost(),
announcement.getTaskStatus().getStatusCode(),
taskId,
announcement.getTaskLocation()
);
HttpRemoteTaskRunnerWorkItem taskItem;
boolean shouldShutdownTask = false;
boolean isTaskCompleted = false;
synchronized (statusLock) {
taskItem = tasks.get(taskId);
if (taskItem == null) {
// Try to find information about it in the TaskStorage
Optional<TaskStatus> knownStatusInStorage = taskStorage.getStatus(taskId);
if (knownStatusInStorage.isPresent()) {
switch (knownStatusInStorage.get().getStatusCode()) {
case RUNNING:
taskItem = new HttpRemoteTaskRunnerWorkItem(
taskId,
worker,
TaskLocation.unknown(),
null,
announcement.getTaskType(),
HttpRemoteTaskRunnerWorkItem.State.RUNNING
);
tasks.put(taskId, taskItem);
break;
case SUCCESS:
case FAILED:
if (!announcement.getTaskStatus().isComplete()) {
log.info(
"Worker[%s] reported status for completed, known from taskStorage, task[%s]. Ignored.",
worker.getHost(),
taskId
);
}
break;
default:
log.makeAlert(
"Found unrecognized state[%s] of task[%s] in taskStorage. Notification[%s] from worker[%s] is ignored.",
knownStatusInStorage.get().getStatusCode(),
taskId,
announcement,
worker.getHost()
).emit();
}
} else {
log.warn(
"Worker[%s] reported status[%s] for unknown task[%s]. Ignored.",
worker.getHost(),
announcement.getStatus(),
taskId
);
}
}
if (taskItem == null) {
if (!announcement.getTaskStatus().isComplete()) {
shouldShutdownTask = true;
}
} else {
switch (announcement.getTaskStatus().getStatusCode()) {
case RUNNING:
switch (taskItem.getState()) {
case PENDING:
case PENDING_WORKER_ASSIGN:
taskItem.setWorker(worker);
taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
log.info("Task[%s] started RUNNING on worker[%s].", taskId, worker.getHost());
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, taskItem.getTask());
emitter.emit(metricBuilder.setMetric(
"task/pending/time",
new Duration(taskItem.getCreatedTime(), DateTimes.nowUtc()).getMillis())
);
// fall through
case RUNNING:
if (worker.getHost().equals(taskItem.getWorker().getHost())) {
if (!announcement.getTaskLocation().equals(taskItem.getLocation())) {
log.info(
"Task[%s] location changed on worker[%s]. new location[%s].",
taskId,
worker.getHost(),
announcement.getTaskLocation()
);
taskItem.setLocation(announcement.getTaskLocation());
TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
}
} else {
log.warn(
"Found worker[%s] running task[%s] which is being run by another worker[%s]. Notification ignored.",
worker.getHost(),
taskId,
taskItem.getWorker().getHost()
);
shouldShutdownTask = true;
}
break;
case COMPLETE:
log.warn(
"Worker[%s] reported status for completed task[%s]. Ignored.",
worker.getHost(),
taskId
);
shouldShutdownTask = true;
break;
default:
log.makeAlert(
"Found unrecognized state[%s] of task[%s]. Notification[%s] from worker[%s] is ignored.",
taskItem.getState(),
taskId,
announcement,
worker.getHost()
).emit();
}
break;
case FAILED:
case SUCCESS:
switch (taskItem.getState()) {
case PENDING:
case PENDING_WORKER_ASSIGN:
taskItem.setWorker(worker);
taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
log.info("Task[%s] finished on worker[%s].", taskId, worker.getHost());
// fall through
case RUNNING:
if (worker.getHost().equals(taskItem.getWorker().getHost())) {
if (!announcement.getTaskLocation().equals(taskItem.getLocation())) {
log.info(
"Task[%s] location changed on worker[%s]. new location[%s].",
taskId,
worker.getHost(),
announcement.getTaskLocation()
);
taskItem.setLocation(announcement.getTaskLocation());
TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
}
isTaskCompleted = true;
} else {
log.warn(
"Worker[%s] reported completed task[%s] which is being run by another worker[%s]. Notification ignored.",
worker.getHost(),
taskId,
taskItem.getWorker().getHost()
);
}
break;
case COMPLETE:
// this can happen when a worker is restarted and reports its list of completed tasks again.
break;
default:
log.makeAlert(
"Found unrecognized state[%s] of task[%s]. Notification[%s] from worker[%s] is ignored.",
taskItem.getState(),
taskId,
announcement,
worker.getHost()
).emit();
}
break;
default:
log.makeAlert(
"Worker[%s] reported unrecognized state[%s] for task[%s].",
worker.getHost(),
announcement.getTaskStatus().getStatusCode(),
taskId
).emit();
}
}
}
if (isTaskCompleted) {
// taskComplete(..) must be called outside of statusLock, see comments on method.
taskComplete(taskItem, workerHolder, announcement.getTaskStatus());
}
if (shouldShutdownTask) {
log.warn("Killing task[%s] on worker[%s].", taskId, worker.getHost());
workerHolder.shutdownTask(taskId);
}
synchronized (statusLock) {
statusLock.notifyAll();
}
}