in data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java [182:278]
private void runContainer(DataParser parser, String inputPath, String outputPath, Map<String, String> environmentValues)
throws Exception {
DefaultDockerClientConfig.Builder config = DefaultDockerClientConfig.createDefaultConfigBuilder();
DockerClient dockerClient = DockerClientBuilder.getInstance(config.build()).build();
logger.info("Pulling image " + parser.getDockerImage());
try {
dockerClient.pullImageCmd(parser.getDockerImage().split(":")[0])
.withTag(parser.getDockerImage().split(":")[1])
.exec(new PullImageResultCallback()).awaitCompletion();
} catch (InterruptedException e) {
logger.error("Interrupted while pulling image", e);
throw e;
}
logger.info("Successfully pulled image " + parser.getDockerImage());
String containerId = UUID.randomUUID().toString();
String commands[] = parser.getExecCommand().split(" ");
CreateContainerResponse containerResponse = dockerClient.createContainerCmd(parser.getDockerImage()).withCmd(commands).withName(containerId)
.withBinds(Bind.parse(inputPath + ":" + parser.getInputPath()),
Bind.parse(outputPath + ":" + parser.getOutputPath()))
.withTty(true)
.withAttachStdin(true)
.withAttachStdout(true).withEnv(environmentValues.entrySet()
.stream()
.map(entry -> entry.getKey() + "=" + entry.getValue())
.collect(Collectors.toList()))
.exec();
logger.info("Created the container with id " + containerResponse.getId());
try {
final StringBuilder dockerLogs = new StringBuilder();
if (containerResponse.getWarnings() != null && containerResponse.getWarnings().length > 0) {
StringBuilder warningStr = new StringBuilder();
for (String w : containerResponse.getWarnings()) {
warningStr.append(w).append(",");
}
logger.warn("Container " + containerResponse.getId() + " warnings : " + warningStr);
} else {
logger.info("Starting container with id " + containerResponse.getId());
dockerClient.startContainerCmd(containerResponse.getId()).exec();
LogContainerCmd logContainerCmd = dockerClient.logContainerCmd(containerResponse.getId()).withStdOut(true).withStdErr(true);
try {
logContainerCmd.exec(new ResultCallback.Adapter<Frame>() {
@Override
public void onNext(Frame item) {
logger.info("Got frame: {}", item);
;
if (item.getStreamType() == StreamType.STDOUT) {
dockerLogs.append(new String(item.getPayload(), StandardCharsets.UTF_8));
dockerLogs.append("\n");
} else if (item.getStreamType() == StreamType.STDERR) {
dockerLogs.append(new String(item.getPayload(), StandardCharsets.UTF_8));
dockerLogs.append("\n");
}
super.onNext(item);
}
@Override
public void onError(Throwable throwable) {
logger.error("Errored while running the container {}", containerId, throwable);
super.onError(throwable);
}
@Override
public void onComplete() {
logger.info("Container {} successfully completed", containerId);
super.onComplete();
}
}).awaitCompletion();
} catch (InterruptedException e) {
logger.info("Successfully removed container with id " + containerResponse.getId());
logger.error("Interrupted while reading container log" + e.getMessage());
throw e;
}
logger.info("Waiting for the container to stop");
Integer statusCode = dockerClient.waitContainerCmd(containerResponse.getId()).exec(new WaitContainerResultCallback()).awaitStatusCode();
logger.info("Container " + containerResponse.getId() + " exited with status code " + statusCode);
if (statusCode != 0) {
logger.error("Failing as non zero status code was returned");
throw new Exception("Failing as non zero status code was returned");
}
logger.info("Container logs " + dockerLogs.toString());
}
} finally {
dockerClient.removeContainerCmd(containerResponse.getId()).exec();
}
}