public Optional launchAgent()

in genie-web/src/main/java/com/netflix/genie/web/agent/launchers/impl/LocalAgentLauncherImpl.java [202:324]


    public Optional<JsonNode> launchAgent(
        @Valid final ResolvedJob resolvedJob,
        @Nullable final JsonNode requestedLauncherExt
    ) throws AgentLaunchException {
        final long start = System.nanoTime();
        log.info("Received request to launch local agent to run job: {}", resolvedJob);
        final Set<Tag> tags = new HashSet<>();
        tags.add(CLASS_TAG);

        try {
            final JobMetadata jobMetadata = resolvedJob.getJobMetadata();
            final String user = jobMetadata.getUser();

            if (this.launcherProperties.isRunAsUserEnabled()) {
                final String group = jobMetadata.getGroup().orElse(null);
                try {
                    UNIXUtils.createUser(user, group, this.sharedExecutor);
                } catch (IOException e) {
                    log.error("Failed to create user {}: {}", jobMetadata.getUser(), e.getMessage(), e);
                    throw new AgentLaunchException(e);
                }
            }

            // Check error conditions
            final long jobMemory = resolvedJob
                .getJobEnvironment()
                .getComputeResources()
                .getMemoryMb()
                .orElse(DEFAULT_JOB_MEMORY);
            final String jobId = resolvedJob.getJobSpecification().getJob().getId();

            // Job was resolved with more memory allocated than the system was configured to allow
            if (jobMemory > this.launcherProperties.getMaxJobMemory()) {
                throw new AgentLaunchException(
                    "Unable to launch job as the requested job memory ("
                        + jobMemory
                        + "MB) exceeds the maximum allowed by the configuration of the system ("
                        + this.launcherProperties.getMaxJobMemory()
                        + "MB)"
                );
            }

            final CommandLine commandLine = this.createCommandLine(
                ImmutableMap.of(
                    LocalAgentLauncherProperties.SERVER_HOST_PLACEHOLDER, this.launcherProperties.getServerHostname(),
                    LocalAgentLauncherProperties.SERVER_PORT_PLACEHOLDER, Integer.toString(this.rpcPort),
                    LocalAgentLauncherProperties.JOB_ID_PLACEHOLDER, jobId,
                    RUN_USER_PLACEHOLDER, user,
                    LocalAgentLauncherProperties.AGENT_JAR_PLACEHOLDER, this.launcherProperties.getAgentJarPath()
                )
            );

            // One at a time to ensure we don't overflow configured max
            synchronized (MEMORY_CHECK_LOCK) {
                final long usedMemoryOnHost = this.persistenceService.getUsedMemoryOnHost(this.hostname);
                final long expectedUsedMemoryOnHost = usedMemoryOnHost + jobMemory;
                if (expectedUsedMemoryOnHost > this.launcherProperties.getMaxTotalJobMemory()) {
                    throw new AgentLaunchException(
                        "Running job "
                            + jobId
                            + " with "
                            + jobMemory
                            + "MB of memory would cause there to be more memory used than the configured amount of "
                            + this.launcherProperties.getMaxTotalJobMemory()
                            + "MB. "
                            + usedMemoryOnHost
                            + "MB worth of jobs are currently running on this node."
                    );
                }
            }

            // Inherit server environment
            final Map<String, String> environment = Maps.newHashMap(System.getenv());
            // Add extra environment from configuration, if any
            environment.putAll(this.launcherProperties.getAdditionalEnvironment());
            // Add tracing context so agent continues trace
            final Span currentSpan = this.tracer.currentSpan();
            if (currentSpan != null) {
                environment.putAll(this.tracePropagator.injectForAgent(currentSpan.context()));
            }
            log.debug("Launching agent: {}, env: {}", commandLine, environment);

            // TODO: What happens if the server crashes? Does the process live on? Make sure this is totally detached
            final Executor executor = this.executorFactory.newInstance(true);

            if (this.launcherProperties.isProcessOutputCaptureEnabled()) {
                final String debugOutputPath =
                    System.getProperty(SystemUtils.JAVA_IO_TMPDIR, "/tmp") + "/agent-job-" + jobId + ".txt";
                try {
                    final FileOutputStream fileOutput = new FileOutputStream(debugOutputPath, false);
                    executor.setStreamHandler(new PumpStreamHandler(fileOutput));
                } catch (final FileNotFoundException e) {
                    log.error("Failed to create agent process output file", e);
                    throw new AgentLaunchException(e);
                }
            }

            log.info("Launching agent for job {}", jobId);

            final AgentResultHandler resultHandler = new AgentResultHandler(jobId);

            try {
                executor.execute(commandLine, environment, resultHandler);
            } catch (final IOException ioe) {
                throw new AgentLaunchException(
                    "Unable to launch agent using command: " + commandLine.toString(),
                    ioe
                );
            }

            MetricsUtils.addSuccessTags(tags);
            return Optional.of(this.launcherExt);
        } catch (final AgentLaunchException e) {
            MetricsUtils.addFailureTagsWithException(tags, e);
            throw e;
        } catch (final Exception e) {
            log.error("Unable to launch local agent due to {}", e.getMessage(), e);
            MetricsUtils.addFailureTagsWithException(tags, e);
            throw new AgentLaunchException("Unable to launch local agent due to unhandled error", e);
        } finally {
            this.registry.timer(LAUNCH_TIMER, tags).record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
        }
    }