public Task newTask()

in software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java [94:259]


    public Task<RET> newTask() {
        final ByteArrayOutputStream stdout = new ByteArrayOutputStream();
        TaskBuilder<RET> taskBuilder = Tasks.<RET>builder().dynamic(true)
                .displayName(this.summary)
                .tag(BrooklynTaskTags.tagForStream(BrooklynTaskTags.STREAM_STDOUT, stdout))
                .tag(new ContainerTaskResult())
                .body(()-> {
                    List<String> commandsCfg =  EntityInitializers.resolve(config, COMMAND);
                    List<String> argumentsCfg =  EntityInitializers.resolve(config, ARGUMENTS);

                    Object bashScript = EntityInitializers.resolve(config, BASH_SCRIPT);
                    if (bashScript!=null) {
                        if (!commandsCfg.isEmpty()) LOG.warn("Ignoring 'command' "+commandsCfg+" because bashScript is set");
                        if (!argumentsCfg.isEmpty()) LOG.warn("Ignoring 'args' "+argumentsCfg+" because bashScript is set");

                        commandsCfg = MutableList.of("/bin/bash", "-c");
                        List<Object> argumentsCfgO = bashScript instanceof Iterable ? MutableList.copyOf((Iterable) bashScript) : MutableList.of(bashScript);
                        argumentsCfg = MutableList.of(argumentsCfgO.stream().map(x -> ""+x).collect(Collectors.joining("\n")));
                    }

                    PullPolicy containerImagePullPolicy = EntityInitializers.resolve(config, CONTAINER_IMAGE_PULL_POLICY);

                    String workingDir = EntityInitializers.resolve(config, WORKING_DIR);
                    Set<Map<String,String>> volumeMounts = (Set<Map<String,String>>) EntityInitializers.resolve(config, VOLUME_MOUNTS);
                    Set<Map<String, Object>> volumes = (Set<Map<String, Object>>) EntityInitializers.resolve(config, VOLUMES);

                    final String kubeJobName = initNamespaceAndGetNewJobName();
                    String containerImage = EntityInitializers.resolve(config, CONTAINER_IMAGE);
                    Entity entity = BrooklynTaskTags.getContextEntity(Tasks.current());

                    LOG.debug("Submitting container job in namespace "+namespace+", name "+kubeJobName);

                    Map<String, String> env = new ShellEnvironmentSerializer(((EntityInternal)entity).getManagementContext()).serialize(EntityInitializers.resolve(config, SHELL_ENVIRONMENT));
                    final BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> jobYaml =  new KubeJobFileCreator()
                            .withImage(containerImage)
                            .withImagePullPolicy(containerImagePullPolicy)
                            .withName(kubeJobName)
                            .withCommand(Lists.newArrayList(commandsCfg))
                            .withArgs(argumentsCfg)
                            .withEnv(env)
                            .withVolumeMounts(volumeMounts)
                            .withVolumes(volumes)
                            .withWorkingDir(workingDir)
                            .createFile();
                    Tasks.addTagDynamically(BrooklynTaskTags.tagForEnvStream(BrooklynTaskTags.STREAM_ENV, env));

                    try {

                        Duration timeout = EntityInitializers.resolve(config, TIMEOUT);

                        ContainerTaskResult result = (ContainerTaskResult) TaskTags.getTagsFast(Tasks.current()).stream().filter(x -> x instanceof ContainerTaskResult).findAny().orElseGet(() -> {
                            LOG.warn("Result object not set on tag at "+Tasks.current()+"; creating");
                            ContainerTaskResult x = new ContainerTaskResult();
                            TaskTags.addTagDynamically(Tasks.current(), x);
                            return x;
                        });
                        result.namespace = namespace;
                        result.kubeJobName = kubeJobName;

                        // validate these as they are passed to shell script, prevent injection
                        if (!namespace.matches("[A-Za-z0-9_.-]+")) throw new IllegalStateException("Invalid namespace: "+namespace);
                        if (!kubeJobName.matches("[A-Za-z0-9_.-]+")) throw new IllegalStateException("Invalid job name: "+kubeJobName);

                        ProcessTaskWrapper<ProcessTaskWrapper<?>> createNsJob = null;
                        if (!Boolean.FALSE.equals(createNamespace)) {
                            ProcessTaskFactory<ProcessTaskWrapper<?>> createNsJobF = newSimpleTaskFactory(
                                    String.format(NAMESPACE_CREATE_CMD, namespace)
                                    //, String.format(NAMESPACE_SET_CMD, namespace)
                            ).summary("Set up namespace").returning(x -> x);
                            if (createNamespace==null) {
                                createNsJobF.allowingNonZeroExitCode();
                            }
                            createNsJob = runTask(entity, createNsJobF.newTask(), true, true);
                        }

                        // only delete if told to always, unless we successfully create it
                        boolean deleteNamespaceHere = Boolean.TRUE.equals(deleteNamespace);
                        try {
                            if (createNsJob!=null) {
                                ProcessTaskWrapper<?> nsDetails = createNsJob.get();
                                if (nsDetails.getExitCode()==0) {
                                    LOG.debug("Namespace created");
                                    if (deleteNamespace==null) deleteNamespaceHere = true;
                                } else if (nsDetails.getExitCode()==1 && nsDetails.getStderr().contains("AlreadyExists")) {
                                    if (Boolean.TRUE.equals(createNamespace)) {
                                        LOG.warn("Namespace "+namespace+" already exists; failing");
                                        throw new IllegalStateException("Namespace "+namespace+" exists when creating a job that expects to create this namespace");
                                    } else {
                                        LOG.debug("Namespace exists already; reusing it");
                                    }
                                } else {
                                    LOG.warn("Unexpected namespace creation problem: "+nsDetails.getStderr()+ "(code "+nsDetails.getExitCode()+")");
                                    if (deleteNamespace==null) deleteNamespaceHere = true;
                                    throw new IllegalStateException("Unexpected namespace creation problem ("+namespace+"); see log for more details");
                                }
                            }

                            runTask(entity,
                                    newSimpleTaskFactory(String.format(JOBS_CREATE_CMD, jobYaml.getFile().getAbsolutePath(), namespace)).summary("Submit job").newTask(), true, true);

                            final CountdownTimer timer = CountdownTimer.newInstanceStarted(timeout);

                            // wait for it to be running (or failed / succeeded) -
                            PodPhases phaseOnceActive = waitForContainerAvailable(entity, kubeJobName, result, timer);
                            result.containerStarted = true;
//                            waitForContainerPodContainerState(kubeJobName, result, timer);

                            // notify once pod is available
                            synchronized (result) { result.notifyAll(); }

                            boolean succeeded = PodPhases.Succeeded == phaseOnceActive ||
                                    (PodPhases.Failed != phaseOnceActive &&
                                            //use `wait --for` api, but in a 5s loop in case there are other issues
//                                            waitForContainerCompletedUsingK8sWaitFor(stdout, kubeJobName, entity, timer)
                                            waitForContainerCompletedUsingPodState(stdout, kubeJobName, entity, timer)
                                    );

                            LOG.debug("Container job "+kubeJobName+" completed, success "+succeeded);

                            ProcessTaskWrapper<String> retrieveOutput = runTask(entity, newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output").newTask(), false, true);
                            ProcessTaskWrapper<String> retrieveExitCode = runTask(entity, newSimpleTaskFactory(String.format(PODS_EXIT_CODE_CMD, namespace, kubeJobName)).summary("Retrieve exit code").newTask(), false, true);

                            String newStdout = retrieveOutput.get();
                            result.mainStdout = newStdout;  //prevent tag contents from being enormous; suppress in JSON serialization
                            updateStdoutWithNewData(stdout, newStdout);

                            retrieveExitCode.get();
                            String exitCodeS = retrieveExitCode.getStdout();
                            if (Strings.isNonBlank(exitCodeS)) result.mainExitCode = Integer.parseInt(exitCodeS.trim());
                            else result.mainExitCode = -1;

                            result.containerEnded = true;
                            synchronized (result) { result.notifyAll(); }

                            if (result.mainExitCode!=0 && config.get(REQUIRE_EXIT_CODE_ZERO)) {
                                LOG.info("Failed container job "+namespace+" (exit code "+result.mainExitCode+") output: "+result.mainStdout);
                                throw new IllegalStateException("Non-zero exit code (" + result.mainExitCode + ") disallowed");
                            }

                            return returnConversion==null ? (RET) result : returnConversion.apply(result);

                        } finally {
                            if (deleteNamespaceHere) {
                                doDeleteNamespace(!namespaceRandom, true);  // if a one-off job, namespace has random id in it so can safely be deleted in background (no one else risks reusing it)
                            } else {
                                Boolean devMode = EntityInitializers.resolve(config, KEEP_CONTAINER_FOR_DEBUGGING);
                                if (!Boolean.TRUE.equals(devMode)) {
                                    Task<String> deletion = Entities.submit(entity, BrooklynTaskTags.setTransient(newDeleteJobTask(kubeJobName)
                                            // namespace might have been deleted in parallel so okay if we don't delete the job;
                                            .allowingNonZeroExitCode()
                                            .newTask().asTask()));
                                    // no big deal if not deleted, job ID will always be unique, so allow to delete in background and not block subsequent tasks
                                    //deletion.get();
                                }
                            }
                            DynamicTasks.waitForLast();
                        }
                    } catch (Exception e) {
                        throw Exceptions.propagate(e);
                    } finally {
                        jobYaml.deleteIfTemp();
                    }
                });

        return taskBuilder.build();
    }