private void runContainer()

in data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java [182:278]


    private void runContainer(DataParser parser, String inputPath, String outputPath, Map<String, String> environmentValues)
            throws Exception {

        DefaultDockerClientConfig.Builder config = DefaultDockerClientConfig.createDefaultConfigBuilder();
        DockerClient dockerClient = DockerClientBuilder.getInstance(config.build()).build();

        logger.info("Pulling image " + parser.getDockerImage());
        try {
            dockerClient.pullImageCmd(parser.getDockerImage().split(":")[0])
                    .withTag(parser.getDockerImage().split(":")[1])
                    .exec(new PullImageResultCallback()).awaitCompletion();
        } catch (InterruptedException e) {
            logger.error("Interrupted while pulling image", e);
            throw e;
        }

        logger.info("Successfully pulled image " + parser.getDockerImage());

        String containerId = UUID.randomUUID().toString();
        String commands[] = parser.getExecCommand().split(" ");
        CreateContainerResponse containerResponse = dockerClient.createContainerCmd(parser.getDockerImage()).withCmd(commands).withName(containerId)
                .withBinds(Bind.parse(inputPath + ":" + parser.getInputPath()),
                        Bind.parse(outputPath + ":" + parser.getOutputPath()))
                .withTty(true)
                .withAttachStdin(true)
                .withAttachStdout(true).withEnv(environmentValues.entrySet()
                        .stream()
                        .map(entry -> entry.getKey() + "=" + entry.getValue())
                        .collect(Collectors.toList()))
                .exec();

        logger.info("Created the container with id " + containerResponse.getId());
        try {

            final StringBuilder dockerLogs = new StringBuilder();

            if (containerResponse.getWarnings() != null && containerResponse.getWarnings().length > 0) {
                StringBuilder warningStr = new StringBuilder();
                for (String w : containerResponse.getWarnings()) {
                    warningStr.append(w).append(",");
                }
                logger.warn("Container " + containerResponse.getId() + " warnings : " + warningStr);
            } else {
                logger.info("Starting container with id " + containerResponse.getId());
                dockerClient.startContainerCmd(containerResponse.getId()).exec();
                LogContainerCmd logContainerCmd = dockerClient.logContainerCmd(containerResponse.getId()).withStdOut(true).withStdErr(true);

                try {

                    logContainerCmd.exec(new ResultCallback.Adapter<Frame>() {
                        @Override
                        public void onNext(Frame item) {
                            logger.info("Got frame: {}", item);
                            ;
                            if (item.getStreamType() == StreamType.STDOUT) {
                                dockerLogs.append(new String(item.getPayload(), StandardCharsets.UTF_8));
                                dockerLogs.append("\n");
                            } else if (item.getStreamType() == StreamType.STDERR) {
                                dockerLogs.append(new String(item.getPayload(), StandardCharsets.UTF_8));
                                dockerLogs.append("\n");
                            }
                            super.onNext(item);
                        }

                        @Override
                        public void onError(Throwable throwable) {
                            logger.error("Errored while running the container {}", containerId, throwable);
                            super.onError(throwable);
                        }

                        @Override
                        public void onComplete() {
                            logger.info("Container {} successfully completed", containerId);
                            super.onComplete();
                        }
                    }).awaitCompletion();
                } catch (InterruptedException e) {
                    logger.info("Successfully removed container with id " + containerResponse.getId());
                    logger.error("Interrupted while reading container log" + e.getMessage());
                    throw e;
                }

                logger.info("Waiting for the container to stop");

                Integer statusCode = dockerClient.waitContainerCmd(containerResponse.getId()).exec(new WaitContainerResultCallback()).awaitStatusCode();
                logger.info("Container " + containerResponse.getId() + " exited with status code " + statusCode);
                if (statusCode != 0) {
                    logger.error("Failing as non zero status code was returned");
                    throw new Exception("Failing as non zero status code was returned");
                }

                logger.info("Container logs " + dockerLogs.toString());
            }
        } finally {
            dockerClient.removeContainerCmd(containerResponse.getId()).exec();
        }
    }