in genie-web/src/main/java/com/netflix/genie/web/agent/launchers/impl/LocalAgentLauncherImpl.java [202:324]
public Optional<JsonNode> launchAgent(
@Valid final ResolvedJob resolvedJob,
@Nullable final JsonNode requestedLauncherExt
) throws AgentLaunchException {
final long start = System.nanoTime();
log.info("Received request to launch local agent to run job: {}", resolvedJob);
final Set<Tag> tags = new HashSet<>();
tags.add(CLASS_TAG);
try {
final JobMetadata jobMetadata = resolvedJob.getJobMetadata();
final String user = jobMetadata.getUser();
if (this.launcherProperties.isRunAsUserEnabled()) {
final String group = jobMetadata.getGroup().orElse(null);
try {
UNIXUtils.createUser(user, group, this.sharedExecutor);
} catch (IOException e) {
log.error("Failed to create user {}: {}", jobMetadata.getUser(), e.getMessage(), e);
throw new AgentLaunchException(e);
}
}
// Check error conditions
final long jobMemory = resolvedJob
.getJobEnvironment()
.getComputeResources()
.getMemoryMb()
.orElse(DEFAULT_JOB_MEMORY);
final String jobId = resolvedJob.getJobSpecification().getJob().getId();
// Job was resolved with more memory allocated than the system was configured to allow
if (jobMemory > this.launcherProperties.getMaxJobMemory()) {
throw new AgentLaunchException(
"Unable to launch job as the requested job memory ("
+ jobMemory
+ "MB) exceeds the maximum allowed by the configuration of the system ("
+ this.launcherProperties.getMaxJobMemory()
+ "MB)"
);
}
final CommandLine commandLine = this.createCommandLine(
ImmutableMap.of(
LocalAgentLauncherProperties.SERVER_HOST_PLACEHOLDER, this.launcherProperties.getServerHostname(),
LocalAgentLauncherProperties.SERVER_PORT_PLACEHOLDER, Integer.toString(this.rpcPort),
LocalAgentLauncherProperties.JOB_ID_PLACEHOLDER, jobId,
RUN_USER_PLACEHOLDER, user,
LocalAgentLauncherProperties.AGENT_JAR_PLACEHOLDER, this.launcherProperties.getAgentJarPath()
)
);
// One at a time to ensure we don't overflow configured max
synchronized (MEMORY_CHECK_LOCK) {
final long usedMemoryOnHost = this.persistenceService.getUsedMemoryOnHost(this.hostname);
final long expectedUsedMemoryOnHost = usedMemoryOnHost + jobMemory;
if (expectedUsedMemoryOnHost > this.launcherProperties.getMaxTotalJobMemory()) {
throw new AgentLaunchException(
"Running job "
+ jobId
+ " with "
+ jobMemory
+ "MB of memory would cause there to be more memory used than the configured amount of "
+ this.launcherProperties.getMaxTotalJobMemory()
+ "MB. "
+ usedMemoryOnHost
+ "MB worth of jobs are currently running on this node."
);
}
}
// Inherit server environment
final Map<String, String> environment = Maps.newHashMap(System.getenv());
// Add extra environment from configuration, if any
environment.putAll(this.launcherProperties.getAdditionalEnvironment());
// Add tracing context so agent continues trace
final Span currentSpan = this.tracer.currentSpan();
if (currentSpan != null) {
environment.putAll(this.tracePropagator.injectForAgent(currentSpan.context()));
}
log.debug("Launching agent: {}, env: {}", commandLine, environment);
// TODO: What happens if the server crashes? Does the process live on? Make sure this is totally detached
final Executor executor = this.executorFactory.newInstance(true);
if (this.launcherProperties.isProcessOutputCaptureEnabled()) {
final String debugOutputPath =
System.getProperty(SystemUtils.JAVA_IO_TMPDIR, "/tmp") + "/agent-job-" + jobId + ".txt";
try {
final FileOutputStream fileOutput = new FileOutputStream(debugOutputPath, false);
executor.setStreamHandler(new PumpStreamHandler(fileOutput));
} catch (final FileNotFoundException e) {
log.error("Failed to create agent process output file", e);
throw new AgentLaunchException(e);
}
}
log.info("Launching agent for job {}", jobId);
final AgentResultHandler resultHandler = new AgentResultHandler(jobId);
try {
executor.execute(commandLine, environment, resultHandler);
} catch (final IOException ioe) {
throw new AgentLaunchException(
"Unable to launch agent using command: " + commandLine.toString(),
ioe
);
}
MetricsUtils.addSuccessTags(tags);
return Optional.of(this.launcherExt);
} catch (final AgentLaunchException e) {
MetricsUtils.addFailureTagsWithException(tags, e);
throw e;
} catch (final Exception e) {
log.error("Unable to launch local agent due to {}", e.getMessage(), e);
MetricsUtils.addFailureTagsWithException(tags, e);
throw new AgentLaunchException("Unable to launch local agent due to unhandled error", e);
} finally {
this.registry.timer(LAUNCH_TIMER, tags).record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
}
}