in software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java [95:261]
public Task<RET> newTask() {
final ByteArrayOutputStream stdout = new ByteArrayOutputStream();
TaskBuilder<RET> taskBuilder = Tasks.<RET>builder().dynamic(true)
.displayName(this.summary)
.tag(BrooklynTaskTags.tagForStream(BrooklynTaskTags.STREAM_STDOUT, stdout))
.tag(new ContainerTaskResult())
.body(()-> {
List<String> commandsCfg = EntityInitializers.resolve(config, COMMAND);
List<String> argumentsCfg = EntityInitializers.resolve(config, ARGUMENTS);
Object bashScript = EntityInitializers.resolve(config, BASH_SCRIPT);
if (bashScript!=null) {
if (!commandsCfg.isEmpty()) LOG.warn("Ignoring 'command' "+commandsCfg+" because bashScript is set");
if (!argumentsCfg.isEmpty()) LOG.warn("Ignoring 'args' "+argumentsCfg+" because bashScript is set");
commandsCfg = MutableList.of("/bin/bash", "-c");
List<Object> argumentsCfgO = bashScript instanceof Iterable ? MutableList.copyOf((Iterable) bashScript) : MutableList.of(bashScript);
argumentsCfg = MutableList.of(argumentsCfgO.stream().map(x -> ""+x).collect(Collectors.joining("\n")));
}
PullPolicy containerImagePullPolicy = EntityInitializers.resolve(config, CONTAINER_IMAGE_PULL_POLICY);
String workingDir = EntityInitializers.resolve(config, WORKING_DIR);
Set<Map<String,String>> volumeMounts = (Set<Map<String,String>>) EntityInitializers.resolve(config, VOLUME_MOUNTS);
Set<Map<String, Object>> volumes = (Set<Map<String, Object>>) EntityInitializers.resolve(config, VOLUMES);
final String kubeJobName = initNamespaceAndGetNewJobName();
String containerImage = EntityInitializers.resolve(config, CONTAINER_IMAGE);
Entity entity = BrooklynTaskTags.getContextEntity(Tasks.current());
LOG.debug("Submitting container job in namespace "+namespace+", name "+kubeJobName);
Map<String, String> env = new ShellEnvironmentSerializer(((EntityInternal)entity).getManagementContext()).serialize(EntityInitializers.resolve(config, SHELL_ENVIRONMENT));
KubeJobFileCreator kubeJobFileCreator = new KubeJobFileCreator()
.withImage(containerImage)
.withImagePullPolicy(containerImagePullPolicy)
.withName(kubeJobName)
.withCommand(Lists.newArrayList(commandsCfg))
.withArgs(argumentsCfg)
.withEnv(env)
.withVolumeMounts(volumeMounts)
.withVolumes(volumes)
.withWorkingDir(workingDir);
final BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> jobYaml = kubeJobFileCreator.createFile();
Tasks.addTagDynamically(BrooklynTaskTags.tagForEnvStream(BrooklynTaskTags.STREAM_ENV, env));
Tasks.addTagDynamically(BrooklynTaskTags.tagForStream("kube job config", Streams.byteArrayOfString(kubeJobFileCreator.getAsString())));
try {
Duration timeout = EntityInitializers.resolve(config, TIMEOUT);
ContainerTaskResult result = (ContainerTaskResult) TaskTags.getTagsFast(Tasks.current()).stream().filter(x -> x instanceof ContainerTaskResult).findAny().orElseGet(() -> {
LOG.warn("Result object not set on tag at "+Tasks.current()+"; creating");
ContainerTaskResult x = new ContainerTaskResult();
TaskTags.addTagDynamically(Tasks.current(), x);
return x;
});
result.namespace = namespace;
result.kubeJobName = kubeJobName;
// validate these as they are passed to shell script, prevent injection
if (!namespace.matches("[A-Za-z0-9_.-]+")) throw new IllegalStateException("Invalid namespace: "+namespace);
if (!kubeJobName.matches("[A-Za-z0-9_.-]+")) throw new IllegalStateException("Invalid job name: "+kubeJobName);
ProcessTaskWrapper<ProcessTaskWrapper<?>> createNsJob = null;
if (!Boolean.FALSE.equals(createNamespace)) {
ProcessTaskFactory<ProcessTaskWrapper<?>> createNsJobF = newSimpleTaskFactory(
String.format(NAMESPACE_CREATE_CMD, namespace)
//, String.format(NAMESPACE_SET_CMD, namespace)
).summary("Set up namespace").returning(x -> x);
if (createNamespace==null) {
createNsJobF.allowingNonZeroExitCode();
}
createNsJob = runTask(entity, createNsJobF.newTask(), true, true);
}
// only delete if told to always, unless we successfully create it
boolean deleteNamespaceHere = Boolean.TRUE.equals(deleteNamespace);
try {
if (createNsJob!=null) {
ProcessTaskWrapper<?> nsDetails = createNsJob.get();
if (nsDetails.getExitCode()==0) {
LOG.debug("Namespace created");
if (deleteNamespace==null) deleteNamespaceHere = true;
} else if (nsDetails.getExitCode()==1 && nsDetails.getStderr().contains("AlreadyExists")) {
if (Boolean.TRUE.equals(createNamespace)) {
LOG.warn("Namespace "+namespace+" already exists; failing");
throw new IllegalStateException("Namespace "+namespace+" exists when creating a job that expects to create this namespace");
} else {
LOG.debug("Namespace exists already; reusing it");
}
} else {
LOG.warn("Unexpected namespace creation problem: "+nsDetails.getStderr()+ "(code "+nsDetails.getExitCode()+")");
if (deleteNamespace==null) deleteNamespaceHere = true;
throw new IllegalStateException("Unexpected namespace creation problem ("+namespace+"); see log for more details");
}
}
runTask(entity,
newSimpleTaskFactory(String.format(JOBS_CREATE_CMD, jobYaml.getFile().getAbsolutePath(), namespace)).summary("Submit job").newTask(), true, true);
final CountdownTimer timer = CountdownTimer.newInstanceStarted(timeout);
// wait for it to be running (or failed / succeeded) -
PodPhases phaseOnceActive = waitForContainerAvailable(entity, kubeJobName, result, timer);
result.containerStarted = true;
// waitForContainerPodContainerState(kubeJobName, result, timer);
// notify once pod is available
synchronized (result) { result.notifyAll(); }
boolean succeeded = PodPhases.Succeeded == phaseOnceActive ||
(PodPhases.Failed != phaseOnceActive &&
//use `wait --for` api, but in a 5s loop in case there are other issues
// waitForContainerCompletedUsingK8sWaitFor(stdout, kubeJobName, entity, timer)
waitForContainerCompletedUsingPodState(stdout, kubeJobName, entity, timer)
);
LOG.debug("Container job "+kubeJobName+" completed, success "+succeeded);
ProcessTaskWrapper<String> retrieveOutput = runTask(entity, newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output").newTask(), false, true);
ProcessTaskWrapper<String> retrieveExitCode = runTask(entity, newSimpleTaskFactory(String.format(PODS_EXIT_CODE_CMD, namespace, kubeJobName)).summary("Retrieve exit code").newTask(), false, true);
String newStdout = retrieveOutput.get();
result.mainStdout = newStdout; //prevent tag contents from being enormous; suppress in JSON serialization
updateStdoutWithNewData(stdout, newStdout);
retrieveExitCode.get();
String exitCodeS = retrieveExitCode.getStdout();
if (Strings.isNonBlank(exitCodeS)) result.mainExitCode = Integer.parseInt(exitCodeS.trim());
else result.mainExitCode = -1;
result.containerEnded = true;
synchronized (result) { result.notifyAll(); }
if (result.mainExitCode!=0 && config.get(REQUIRE_EXIT_CODE_ZERO)) {
LOG.info("Failed container job "+namespace+" (exit code "+result.mainExitCode+") output: "+result.mainStdout);
throw new IllegalStateException("Non-zero exit code (" + result.mainExitCode + ") disallowed");
}
return returnConversion==null ? (RET) result : returnConversion.apply(result);
} finally {
if (deleteNamespaceHere) {
doDeleteNamespace(!namespaceRandom, true); // if a one-off job, namespace has random id in it so can safely be deleted in background (no one else risks reusing it)
} else {
Boolean devMode = EntityInitializers.resolve(config, KEEP_CONTAINER_FOR_DEBUGGING);
if (!Boolean.TRUE.equals(devMode)) {
Task<String> deletion = Entities.submit(entity, BrooklynTaskTags.setTransient(newDeleteJobTask(kubeJobName)
// namespace might have been deleted in parallel so okay if we don't delete the job;
.allowingNonZeroExitCode()
.newTask().asTask()));
// no big deal if not deleted, job ID will always be unique, so allow to delete in background and not block subsequent tasks
//deletion.get();
}
}
DynamicTasks.waitForLast();
}
} catch (Exception e) {
throw Exceptions.propagate(e);
} finally {
jobYaml.deleteIfTemp();
}
});
return taskBuilder.build();
}