in indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java [163:495]
public ListenableFuture<TaskStatus> run(final Task task)
{
synchronized (tasks) {
tasks.computeIfAbsent(
task.getId(), k ->
new ForkingTaskRunnerWorkItem(
task,
exec.submit(
new Callable<>() {
@Override
public TaskStatus call()
{
final TaskStorageDirTracker.StorageSlot storageSlot;
try {
storageSlot = getTracker().pickStorageSlot(task.getId());
}
catch (RuntimeException e) {
LOG.warn(e, "Failed to get storage slot for task [%s], cannot schedule.", task.getId());
return TaskStatus.failure(
task.getId(),
StringUtils.format("Failed to get storage slot due to error [%s]", e.getMessage())
);
}
final File taskDir = new File(storageSlot.getDirectory(), task.getId());
final String attemptId = String.valueOf(getNextAttemptID(taskDir));
final File attemptDir = Paths.get(taskDir.getAbsolutePath(), "attempt", attemptId).toFile();
final ProcessHolder processHolder;
final String childHost = node.getHost();
int childPort = -1;
int tlsChildPort = -1;
if (node.isEnablePlaintextPort()) {
childPort = portFinder.findUnusedPort();
}
if (node.isEnableTlsPort()) {
tlsChildPort = portFinder.findUnusedPort();
}
final TaskLocation taskLocation = TaskLocation.create(childHost, childPort, tlsChildPort);
try {
final Closer closer = Closer.create();
try {
final File taskFile = new File(taskDir, "task.json");
final File statusFile = new File(attemptDir, "status.json");
final File logFile = new File(taskDir, "log");
final File reportsFile = new File(attemptDir, "report.json");
// time to adjust process holders
synchronized (tasks) {
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId());
if (taskWorkItem == null) {
LOGGER.makeAlert("TaskInfo disappeared!").addData("task", task.getId()).emit();
throw new ISE("TaskInfo disappeared for task[%s]!", task.getId());
}
if (taskWorkItem.shutdown) {
throw new IllegalStateException("Task has been shut down!");
}
if (taskWorkItem.processHolder != null) {
LOGGER.makeAlert("TaskInfo already has a processHolder")
.addData("task", task.getId())
.emit();
throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId());
}
final CommandListBuilder command = new CommandListBuilder();
final String taskClasspath;
if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
taskClasspath = Joiner.on(File.pathSeparator).join(
task.getClasspathPrefix(),
config.getClasspath()
);
} else {
taskClasspath = config.getClasspath();
}
command.add(config.getJavaCommand());
if (JvmUtils.majorVersion() >= 11) {
command.addAll(STRONG_ENCAPSULATION_PROPERTIES);
}
command.add("-cp");
command.add(taskClasspath);
if (numProcessorsPerTask < 1) {
// numProcessorsPerTask is set by start()
throw new ISE("Not started");
}
command.add(StringUtils.format("-XX:ActiveProcessorCount=%d", numProcessorsPerTask));
command.addAll(new QuotableWhiteSpaceSplitter(config.getJavaOpts()));
command.addAll(config.getJavaOptsArray());
// Override task specific javaOpts
Object taskJavaOpts = task.getContextValue(
ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
);
if (taskJavaOpts != null) {
command.addAll(new QuotableWhiteSpaceSplitter((String) taskJavaOpts));
}
// Override task specific javaOptsArray
try {
List<String> taskJavaOptsArray = jsonMapper.convertValue(
task.getContextValue(ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY),
new TypeReference<>() {}
);
if (taskJavaOptsArray != null) {
command.addAll(taskJavaOptsArray);
}
}
catch (Exception e) {
throw new IllegalArgumentException(
ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY
+ " in context of task: " + task.getId() + " must be an array of strings.",
e
);
}
for (String propName : props.stringPropertyNames()) {
for (String allowedPrefix : config.getAllowedPrefixes()) {
// See https://github.com/apache/druid/issues/1841
if (propName.startsWith(allowedPrefix)
&& !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName)
&& !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName)
) {
command.addSystemProperty(propName, props.getProperty(propName));
}
}
}
// Override child JVM specific properties
for (String propName : props.stringPropertyNames()) {
if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
command.addSystemProperty(
propName.substring(CHILD_PROPERTY_PREFIX.length()),
props.getProperty(propName)
);
}
}
// Override task specific properties
final Map<String, Object> context = task.getContext();
if (context != null) {
for (String propName : context.keySet()) {
if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
Object contextValue = task.getContextValue(propName);
if (contextValue != null) {
command.addSystemProperty(
propName.substring(CHILD_PROPERTY_PREFIX.length()),
String.valueOf(contextValue)
);
}
}
}
}
// add the attemptId as a system property
command.addSystemProperty("attemptId", "1");
// Add dataSource, taskId and taskType for metrics or logging
command.addSystemProperty(
MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.DATASOURCE,
task.getDataSource()
);
command.addSystemProperty(
MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.TASK_ID,
task.getId()
);
command.addSystemProperty(
MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.TASK_TYPE,
task.getType()
);
command.addSystemProperty(
MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.GROUP_ID,
task.getGroupId()
);
command.addSystemProperty("druid.host", childHost);
command.addSystemProperty("druid.plaintextPort", childPort);
command.addSystemProperty("druid.tlsPort", tlsChildPort);
// Let tasks know where they are running on.
// This information is used in native parallel indexing with shuffle.
command.addSystemProperty("druid.task.executor.service", node.getServiceName());
command.addSystemProperty("druid.task.executor.host", node.getHost());
command.addSystemProperty("druid.task.executor.plaintextPort", node.getPlaintextPort());
command.addSystemProperty("druid.task.executor.enablePlaintextPort", node.isEnablePlaintextPort());
command.addSystemProperty("druid.task.executor.tlsPort", node.getTlsPort());
command.addSystemProperty("druid.task.executor.enableTlsPort", node.isEnableTlsPort());
command.addSystemProperty("log4j2.configurationFactory", ConsoleLoggingEnforcementConfigurationFactory.class.getName());
command.addSystemProperty("druid.indexer.task.baseTaskDir", storageSlot.getDirectory().getAbsolutePath());
command.addSystemProperty("druid.indexer.task.tmpStorageBytesPerTask", storageSlot.getNumBytes());
command.add("org.apache.druid.cli.Main");
command.add("internal");
command.add("peon");
command.add(taskDir.toString());
command.add(attemptId);
String nodeType = task.getNodeType();
if (nodeType != null) {
command.add("--nodeType");
command.add(nodeType);
}
// If the task type is queryable, we need to load broadcast segments on the peon, used for
// join queries. This is replaced by --loadBroadcastDatasourceMode option, but is preserved here
// for backwards compatibility and can be removed in a future release.
if (task.supportsQueries()) {
command.add("--loadBroadcastSegments");
command.add("true");
}
command.add("--loadBroadcastDatasourceMode");
command.add(task.getBroadcastDatasourceLoadingSpec().getMode().toString());
if (!taskFile.exists()) {
jsonMapper.writeValue(taskFile, task);
}
LOGGER.info(
"Running command[%s]",
getMaskedCommand(startupLoggingConfig.getMaskProperties(), command.getCommandList())
);
taskWorkItem.processHolder = runTaskProcess(command.getCommandList(), logFile, taskLocation);
processHolder = taskWorkItem.processHolder;
processHolder.registerWithCloser(closer);
}
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation);
TaskRunnerUtils.notifyStatusChanged(
listeners,
task.getId(),
TaskStatus.running(task.getId())
);
LOGGER.info("Logging output of task[%s] to file[%s].", task.getId(), logFile);
final int exitCode = waitForTaskProcessToComplete(task, processHolder, logFile, reportsFile);
final TaskStatus status;
if (exitCode == 0) {
LOGGER.info("Process exited successfully for task[%s]", task.getId());
// Process exited successfully
status = jsonMapper.readValue(statusFile, TaskStatus.class);
} else {
LOGGER.error("Process exited with code[%d] for task[%s]", exitCode, task.getId());
// Process exited unsuccessfully
status = TaskStatus.failure(
task.getId(),
StringUtils.format(
"Task execution process exited unsuccessfully with code[%s]. "
+ "See middleManager logs for more details.",
exitCode
)
);
}
if (status.isSuccess()) {
successfulTaskCount.incrementAndGet();
} else {
failedTaskCount.incrementAndGet();
}
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
return status;
}
catch (Throwable t) {
throw closer.rethrow(t);
}
finally {
closer.close();
}
}
catch (Throwable t) {
LOGGER.info(t, "Exception caught during execution");
throw new RuntimeException(t);
}
finally {
try {
synchronized (tasks) {
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId());
if (taskWorkItem != null && taskWorkItem.processHolder != null) {
taskWorkItem.processHolder.shutdown();
}
if (!stopping) {
saveRunningTasks();
}
}
if (node.isEnablePlaintextPort()) {
portFinder.markPortUnused(childPort);
}
if (node.isEnableTlsPort()) {
portFinder.markPortUnused(tlsChildPort);
}
getTracker().returnStorageSlot(storageSlot);
try {
if (!stopping && taskDir.exists()) {
FileUtils.deleteDirectory(taskDir);
LOGGER.info("Removing task directory: %s", taskDir);
}
}
catch (Exception e) {
LOGGER.makeAlert(e, "Failed to delete task directory")
.addData("taskDir", taskDir.toString())
.addData("task", task.getId())
.emit();
}
}
catch (Exception e) {
LOGGER.error(e, "Suppressing exception caught while cleaning up task");
}
}
}
}
)
)
);
saveRunningTasks();
return tasks.get(task.getId()).getResult();
}
}