public ListenableFuture run()

in indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java [163:495]


  public ListenableFuture<TaskStatus> run(final Task task)
  {
    synchronized (tasks) {
      tasks.computeIfAbsent(
          task.getId(), k ->
          new ForkingTaskRunnerWorkItem(
            task,
            exec.submit(
              new Callable<>() {
                @Override
                public TaskStatus call()
                {
                  final TaskStorageDirTracker.StorageSlot storageSlot;
                  try {
                    storageSlot = getTracker().pickStorageSlot(task.getId());
                  }
                  catch (RuntimeException e) {
                    LOG.warn(e, "Failed to get storage slot for task [%s], cannot schedule.", task.getId());
                    return TaskStatus.failure(
                        task.getId(),
                        StringUtils.format("Failed to get storage slot due to error [%s]", e.getMessage())
                    );
                  }

                  final File taskDir = new File(storageSlot.getDirectory(), task.getId());
                  final String attemptId = String.valueOf(getNextAttemptID(taskDir));
                  final File attemptDir = Paths.get(taskDir.getAbsolutePath(), "attempt", attemptId).toFile();

                  final ProcessHolder processHolder;
                  final String childHost = node.getHost();
                  int childPort = -1;
                  int tlsChildPort = -1;

                  if (node.isEnablePlaintextPort()) {
                    childPort = portFinder.findUnusedPort();
                  }

                  if (node.isEnableTlsPort()) {
                    tlsChildPort = portFinder.findUnusedPort();
                  }

                  final TaskLocation taskLocation = TaskLocation.create(childHost, childPort, tlsChildPort);

                  try {
                    final Closer closer = Closer.create();
                    try {
                      final File taskFile = new File(taskDir, "task.json");
                      final File statusFile = new File(attemptDir, "status.json");
                      final File logFile = new File(taskDir, "log");
                      final File reportsFile = new File(attemptDir, "report.json");

                      // time to adjust process holders
                      synchronized (tasks) {
                        final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId());

                        if (taskWorkItem == null) {
                          LOGGER.makeAlert("TaskInfo disappeared!").addData("task", task.getId()).emit();
                          throw new ISE("TaskInfo disappeared for task[%s]!", task.getId());
                        }

                        if (taskWorkItem.shutdown) {
                          throw new IllegalStateException("Task has been shut down!");
                        }

                        if (taskWorkItem.processHolder != null) {
                          LOGGER.makeAlert("TaskInfo already has a processHolder")
                                .addData("task", task.getId())
                                .emit();
                          throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId());
                        }

                        final CommandListBuilder command = new CommandListBuilder();
                        final String taskClasspath;
                        if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
                          taskClasspath = Joiner.on(File.pathSeparator).join(
                            task.getClasspathPrefix(),
                            config.getClasspath()
                          );
                        } else {
                          taskClasspath = config.getClasspath();
                        }

                        command.add(config.getJavaCommand());

                        if (JvmUtils.majorVersion() >= 11) {
                          command.addAll(STRONG_ENCAPSULATION_PROPERTIES);
                        }

                        command.add("-cp");
                        command.add(taskClasspath);

                        if (numProcessorsPerTask < 1) {
                          // numProcessorsPerTask is set by start()
                          throw new ISE("Not started");
                        }

                        command.add(StringUtils.format("-XX:ActiveProcessorCount=%d", numProcessorsPerTask));

                        command.addAll(new QuotableWhiteSpaceSplitter(config.getJavaOpts()));
                        command.addAll(config.getJavaOptsArray());

                        // Override task specific javaOpts
                        Object taskJavaOpts = task.getContextValue(
                            ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
                        );
                        if (taskJavaOpts != null) {
                          command.addAll(new QuotableWhiteSpaceSplitter((String) taskJavaOpts));
                        }

                        // Override task specific javaOptsArray
                        try {
                          List<String> taskJavaOptsArray = jsonMapper.convertValue(
                              task.getContextValue(ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY),
                              new TypeReference<>() {}
                          );
                          if (taskJavaOptsArray != null) {
                            command.addAll(taskJavaOptsArray);
                          }
                        }
                        catch (Exception e) {
                          throw new IllegalArgumentException(
                              ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY
                              + " in context of task: " + task.getId() + " must be an array of strings.",
                              e
                          );
                        }

                        for (String propName : props.stringPropertyNames()) {
                          for (String allowedPrefix : config.getAllowedPrefixes()) {
                            // See https://github.com/apache/druid/issues/1841
                            if (propName.startsWith(allowedPrefix)
                                && !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName)
                                && !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName)
                            ) {
                              command.addSystemProperty(propName, props.getProperty(propName));
                            }
                          }
                        }

                        // Override child JVM specific properties
                        for (String propName : props.stringPropertyNames()) {
                          if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
                            command.addSystemProperty(
                                propName.substring(CHILD_PROPERTY_PREFIX.length()),
                                props.getProperty(propName)
                            );
                          }
                        }

                        // Override task specific properties
                        final Map<String, Object> context = task.getContext();
                        if (context != null) {
                          for (String propName : context.keySet()) {
                            if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
                              Object contextValue = task.getContextValue(propName);
                              if (contextValue != null) {
                                command.addSystemProperty(
                                    propName.substring(CHILD_PROPERTY_PREFIX.length()),
                                    String.valueOf(contextValue)
                                );
                              }
                            }
                          }
                        }

                        // add the attemptId as a system property
                        command.addSystemProperty("attemptId", "1");

                        // Add dataSource, taskId and taskType for metrics or logging
                        command.addSystemProperty(
                            MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.DATASOURCE,
                            task.getDataSource()
                        );
                        command.addSystemProperty(
                            MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.TASK_ID,
                            task.getId()
                        );
                        command.addSystemProperty(
                            MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.TASK_TYPE,
                            task.getType()
                        );
                        command.addSystemProperty(
                            MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.GROUP_ID,
                            task.getGroupId()
                        );


                        command.addSystemProperty("druid.host", childHost);
                        command.addSystemProperty("druid.plaintextPort", childPort);
                        command.addSystemProperty("druid.tlsPort", tlsChildPort);

                        // Let tasks know where they are running on.
                        // This information is used in native parallel indexing with shuffle.
                        command.addSystemProperty("druid.task.executor.service", node.getServiceName());
                        command.addSystemProperty("druid.task.executor.host", node.getHost());
                        command.addSystemProperty("druid.task.executor.plaintextPort", node.getPlaintextPort());
                        command.addSystemProperty("druid.task.executor.enablePlaintextPort", node.isEnablePlaintextPort());
                        command.addSystemProperty("druid.task.executor.tlsPort", node.getTlsPort());
                        command.addSystemProperty("druid.task.executor.enableTlsPort", node.isEnableTlsPort());
                        command.addSystemProperty("log4j2.configurationFactory", ConsoleLoggingEnforcementConfigurationFactory.class.getName());

                        command.addSystemProperty("druid.indexer.task.baseTaskDir", storageSlot.getDirectory().getAbsolutePath());
                        command.addSystemProperty("druid.indexer.task.tmpStorageBytesPerTask", storageSlot.getNumBytes());

                        command.add("org.apache.druid.cli.Main");
                        command.add("internal");
                        command.add("peon");
                        command.add(taskDir.toString());
                        command.add(attemptId);
                        String nodeType = task.getNodeType();
                        if (nodeType != null) {
                          command.add("--nodeType");
                          command.add(nodeType);
                        }

                        // If the task type is queryable, we need to load broadcast segments on the peon, used for
                        // join queries. This is replaced by --loadBroadcastDatasourceMode option, but is preserved here
                        // for backwards compatibility and can be removed in a future release.
                        if (task.supportsQueries()) {
                          command.add("--loadBroadcastSegments");
                          command.add("true");
                        }

                        command.add("--loadBroadcastDatasourceMode");
                        command.add(task.getBroadcastDatasourceLoadingSpec().getMode().toString());

                        if (!taskFile.exists()) {
                          jsonMapper.writeValue(taskFile, task);
                        }

                        LOGGER.info(
                            "Running command[%s]",
                            getMaskedCommand(startupLoggingConfig.getMaskProperties(), command.getCommandList())
                        );
                        taskWorkItem.processHolder = runTaskProcess(command.getCommandList(), logFile, taskLocation);

                        processHolder = taskWorkItem.processHolder;
                        processHolder.registerWithCloser(closer);
                      }

                      TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation);
                      TaskRunnerUtils.notifyStatusChanged(
                          listeners,
                          task.getId(),
                          TaskStatus.running(task.getId())
                      );

                      LOGGER.info("Logging output of task[%s] to file[%s].", task.getId(), logFile);
                      final int exitCode = waitForTaskProcessToComplete(task, processHolder, logFile, reportsFile);
                      final TaskStatus status;
                      if (exitCode == 0) {
                        LOGGER.info("Process exited successfully for task[%s]", task.getId());
                        // Process exited successfully
                        status = jsonMapper.readValue(statusFile, TaskStatus.class);
                      } else {
                        LOGGER.error("Process exited with code[%d] for task[%s]", exitCode, task.getId());
                        // Process exited unsuccessfully
                        status = TaskStatus.failure(
                            task.getId(),
                            StringUtils.format(
                                "Task execution process exited unsuccessfully with code[%s]. "
                                + "See middleManager logs for more details.",
                                exitCode
                            )
                        );
                      }
                      if (status.isSuccess()) {
                        successfulTaskCount.incrementAndGet();
                      } else {
                        failedTaskCount.incrementAndGet();
                      }
                      TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
                      return status;
                    }
                    catch (Throwable t) {
                      throw closer.rethrow(t);
                    }
                    finally {
                      closer.close();
                    }
                  }
                  catch (Throwable t) {
                    LOGGER.info(t, "Exception caught during execution");
                    throw new RuntimeException(t);
                  }
                  finally {
                    try {
                      synchronized (tasks) {
                        final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId());
                        if (taskWorkItem != null && taskWorkItem.processHolder != null) {
                          taskWorkItem.processHolder.shutdown();
                        }
                        if (!stopping) {
                          saveRunningTasks();
                        }
                      }

                      if (node.isEnablePlaintextPort()) {
                        portFinder.markPortUnused(childPort);
                      }
                      if (node.isEnableTlsPort()) {
                        portFinder.markPortUnused(tlsChildPort);
                      }

                      getTracker().returnStorageSlot(storageSlot);

                      try {
                        if (!stopping && taskDir.exists()) {
                          FileUtils.deleteDirectory(taskDir);
                          LOGGER.info("Removing task directory: %s", taskDir);
                        }
                      }
                      catch (Exception e) {
                        LOGGER.makeAlert(e, "Failed to delete task directory")
                              .addData("taskDir", taskDir.toString())
                              .addData("task", task.getId())
                              .emit();
                      }
                    }
                    catch (Exception e) {
                      LOGGER.error(e, "Suppressing exception caught while cleaning up task");
                    }
                  }
                }

              }
            )
          )
      );
      saveRunningTasks();
      return tasks.get(task.getId()).getResult();
    }
  }