public TaskResult runBlockingCode()

in data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java [75:172]


    public TaskResult runBlockingCode() {

        ParserFetchResponse parserFetchResponse;
        ManagedChannel channel = null;
        try {
            channel = ManagedChannelBuilder.forAddress(getParserServiceHost(), getParserServicePort()).usePlaintext().build();
            DataParserServiceGrpc.DataParserServiceBlockingStub parserClient = DataParserServiceGrpc.newBlockingStub(channel);
            parserFetchResponse = parserClient
                    .fetchParser(ParserFetchRequest.newBuilder()
                            .setParserId(getParserId()).build());

        } finally {

            if (channel != null) {
                channel.shutdown();
            }
        }


        DataParser parser = parserFetchResponse.getParser();
        List<DataParserInputInterface> inputInterfaces = parser.getInputInterfacesList();

        String tempInputDir = getWorkingDirectory() + File.separator + "inputs";
        String tempOutputDir = getWorkingDirectory() + File.separator + "outputs";
        logger.info("Using temp working directory {}", getWorkingDirectory());
        try {
            if (!Files.exists(Paths.get(getWorkingDirectory()))) {
                Files.createDirectory(Paths.get(getWorkingDirectory()));
            }
            if (!Files.exists(Paths.get(tempInputDir))) {
                Files.createDirectory(Paths.get(tempInputDir));
            }
            if (!Files.exists(Paths.get(tempOutputDir))) {
                Files.createDirectory(Paths.get(tempOutputDir));
            }

        } catch (IOException e) {
            logger.error("Failed to create temp working directories in {}", getWorkingDirectory(), e);
            return new TaskResult(TaskResult.Status.FAILED, "Failed to create temp working directories");
        }

        for (DataParserInputInterface dpi : inputInterfaces) {
            String path = getInputMapping().get(dpi.getParserInputInterfaceId());
            if (path == null) {
                logger.error("No value specified for input {}", dpi.getParserInputInterfaceId());
                return new TaskResult(TaskResult.Status.FAILED, "No value specified for input");
            }

            if (path.startsWith("$")) {
                String derivedPath = getUserContent(path.substring(1), Scope.WORKFLOW);
                if (derivedPath == null) {
                    logger.error("No value in context to path {} for {}", path, dpi.getParserInputInterfaceId());
                    return new TaskResult(TaskResult.Status.FAILED, "No value specified in context for path");
                }
                path = derivedPath;
            }

            try {
                Files.copy(Paths.get(path), Paths.get(tempInputDir + File.separator + dpi.getInputName()));
                logger.info("Copied input file from path {} to {}", path, tempInputDir + File.separator + dpi.getInputName());
            } catch (IOException e) {
                logger.error("Failed to copy the input from path {} to {}", path, tempInputDir);
                return new TaskResult(TaskResult.Status.FAILED, "Failed to copy the input");
            }
        }

        try {
            runContainer(parser, tempInputDir, tempOutputDir, new HashMap<>());
            exportOutputs(parser, tempOutputDir);
        } catch (Exception e) {

            Path dir = Paths.get(workingDirectory.get());
            try {
                if (!tempDataFile.get().isEmpty()) {
                    logger.info("Deleting resources : " + Paths.get(tempDataFile.get()));

                    Files.delete(Paths.get(tempDataFile.get()));
                }
                Files
                        .walk(dir) // Traverse the file tree in depth-first order
                        .sorted(Comparator.reverseOrder())
                        .forEach(path -> {
                            try {
                                logger.info("Deleting resources : " + path);
                                Files.delete(path);  //delete each file or directory
                            } catch (IOException ex) {
                                logger.error("File cleanup failed for path " + dir, ex);
                            }
                        });
            } catch (Exception ex) {
                logger.error("File cleanup task failed " + dir, ex);
            }

            logger.error("Failed to execute the container for task {}", getTaskId());
            return new TaskResult(TaskResult.Status.FAILED, "Failed to execute the container");
        }
        return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
    }