public void taskAddedOrUpdated()

in indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java [1511:1707]


  public void taskAddedOrUpdated(final TaskAnnouncement announcement, final WorkerHolder workerHolder)
  {
    final String taskId = announcement.getTaskId();
    final Worker worker = workerHolder.getWorker();

    log.debug(
        "Worker[%s] wrote [%s] status for task [%s] on [%s]",
        worker.getHost(),
        announcement.getTaskStatus().getStatusCode(),
        taskId,
        announcement.getTaskLocation()
    );

    HttpRemoteTaskRunnerWorkItem taskItem;
    boolean shouldShutdownTask = false;
    boolean isTaskCompleted = false;

    synchronized (statusLock) {
      taskItem = tasks.get(taskId);
      if (taskItem == null) {
        // Try to find information about it in the TaskStorage
        Optional<TaskStatus> knownStatusInStorage = taskStorage.getStatus(taskId);

        if (knownStatusInStorage.isPresent()) {
          switch (knownStatusInStorage.get().getStatusCode()) {
            case RUNNING:
              taskItem = new HttpRemoteTaskRunnerWorkItem(
                  taskId,
                  worker,
                  TaskLocation.unknown(),
                  null,
                  announcement.getTaskType(),
                  HttpRemoteTaskRunnerWorkItem.State.RUNNING
              );
              tasks.put(taskId, taskItem);
              break;
            case SUCCESS:
            case FAILED:
              if (!announcement.getTaskStatus().isComplete()) {
                log.info(
                    "Worker[%s] reported status for completed, known from taskStorage, task[%s]. Ignored.",
                    worker.getHost(),
                    taskId
                );
              }
              break;
            default:
              log.makeAlert(
                  "Found unrecognized state[%s] of task[%s] in taskStorage. Notification[%s] from worker[%s] is ignored.",
                  knownStatusInStorage.get().getStatusCode(),
                  taskId,
                  announcement,
                  worker.getHost()
              ).emit();
          }
        } else {
          log.warn(
              "Worker[%s] reported status[%s] for unknown task[%s]. Ignored.",
              worker.getHost(),
              announcement.getStatus(),
              taskId
          );
        }
      }

      if (taskItem == null) {
        if (!announcement.getTaskStatus().isComplete()) {
          shouldShutdownTask = true;
        }
      } else {
        switch (announcement.getTaskStatus().getStatusCode()) {
          case RUNNING:
            switch (taskItem.getState()) {
              case PENDING:
              case PENDING_WORKER_ASSIGN:
                taskItem.setWorker(worker);
                taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
                log.info("Task[%s] started RUNNING on worker[%s].", taskId, worker.getHost());

                final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
                IndexTaskUtils.setTaskDimensions(metricBuilder, taskItem.getTask());
                emitter.emit(metricBuilder.setMetric(
                    "task/pending/time",
                    new Duration(taskItem.getCreatedTime(), DateTimes.nowUtc()).getMillis())
                );

                // fall through
              case RUNNING:
                if (worker.getHost().equals(taskItem.getWorker().getHost())) {
                  if (!announcement.getTaskLocation().equals(taskItem.getLocation())) {
                    log.info(
                        "Task[%s] location changed on worker[%s]. new location[%s].",
                        taskId,
                        worker.getHost(),
                        announcement.getTaskLocation()
                    );
                    taskItem.setLocation(announcement.getTaskLocation());
                    TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
                  }
                } else {
                  log.warn(
                      "Found worker[%s] running task[%s] which is being run by another worker[%s]. Notification ignored.",
                      worker.getHost(),
                      taskId,
                      taskItem.getWorker().getHost()
                  );
                  shouldShutdownTask = true;
                }
                break;
              case COMPLETE:
                log.warn(
                    "Worker[%s] reported status for completed task[%s]. Ignored.",
                    worker.getHost(),
                    taskId
                );
                shouldShutdownTask = true;
                break;
              default:
                log.makeAlert(
                    "Found unrecognized state[%s] of task[%s]. Notification[%s] from worker[%s] is ignored.",
                    taskItem.getState(),
                    taskId,
                    announcement,
                    worker.getHost()
                ).emit();
            }
            break;
          case FAILED:
          case SUCCESS:
            switch (taskItem.getState()) {
              case PENDING:
              case PENDING_WORKER_ASSIGN:
                taskItem.setWorker(worker);
                taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
                log.info("Task[%s] finished on worker[%s].", taskId, worker.getHost());
                // fall through
              case RUNNING:
                if (worker.getHost().equals(taskItem.getWorker().getHost())) {
                  if (!announcement.getTaskLocation().equals(taskItem.getLocation())) {
                    log.info(
                        "Task[%s] location changed on worker[%s]. new location[%s].",
                        taskId,
                        worker.getHost(),
                        announcement.getTaskLocation()
                    );
                    taskItem.setLocation(announcement.getTaskLocation());
                    TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
                  }

                  isTaskCompleted = true;
                } else {
                  log.warn(
                      "Worker[%s] reported completed task[%s] which is being run by another worker[%s]. Notification ignored.",
                      worker.getHost(),
                      taskId,
                      taskItem.getWorker().getHost()
                  );
                }
                break;
              case COMPLETE:
                // this can happen when a worker is restarted and reports its list of completed tasks again.
                break;
              default:
                log.makeAlert(
                    "Found unrecognized state[%s] of task[%s]. Notification[%s] from worker[%s] is ignored.",
                    taskItem.getState(),
                    taskId,
                    announcement,
                    worker.getHost()
                ).emit();
            }
            break;
          default:
            log.makeAlert(
                "Worker[%s] reported unrecognized state[%s] for task[%s].",
                worker.getHost(),
                announcement.getTaskStatus().getStatusCode(),
                taskId
            ).emit();
        }
      }
    }

    if (isTaskCompleted) {
      // taskComplete(..) must be called outside of statusLock, see comments on method.
      taskComplete(taskItem, workerHolder, announcement.getTaskStatus());
    }

    if (shouldShutdownTask) {
      log.warn("Killing task[%s] on worker[%s].", taskId, worker.getHost());
      workerHolder.shutdownTask(taskId);
    }

    synchronized (statusLock) {
      statusLock.notifyAll();
    }
  }