in data-orchestrator/workflow-engine/workflow-engine-core/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/GenericDataParsingTask.java [75:172]
public TaskResult runBlockingCode() {
ParserFetchResponse parserFetchResponse;
ManagedChannel channel = null;
try {
channel = ManagedChannelBuilder.forAddress(getParserServiceHost(), getParserServicePort()).usePlaintext().build();
DataParserServiceGrpc.DataParserServiceBlockingStub parserClient = DataParserServiceGrpc.newBlockingStub(channel);
parserFetchResponse = parserClient
.fetchParser(ParserFetchRequest.newBuilder()
.setParserId(getParserId()).build());
} finally {
if (channel != null) {
channel.shutdown();
}
}
DataParser parser = parserFetchResponse.getParser();
List<DataParserInputInterface> inputInterfaces = parser.getInputInterfacesList();
String tempInputDir = getWorkingDirectory() + File.separator + "inputs";
String tempOutputDir = getWorkingDirectory() + File.separator + "outputs";
logger.info("Using temp working directory {}", getWorkingDirectory());
try {
if (!Files.exists(Paths.get(getWorkingDirectory()))) {
Files.createDirectory(Paths.get(getWorkingDirectory()));
}
if (!Files.exists(Paths.get(tempInputDir))) {
Files.createDirectory(Paths.get(tempInputDir));
}
if (!Files.exists(Paths.get(tempOutputDir))) {
Files.createDirectory(Paths.get(tempOutputDir));
}
} catch (IOException e) {
logger.error("Failed to create temp working directories in {}", getWorkingDirectory(), e);
return new TaskResult(TaskResult.Status.FAILED, "Failed to create temp working directories");
}
for (DataParserInputInterface dpi : inputInterfaces) {
String path = getInputMapping().get(dpi.getParserInputInterfaceId());
if (path == null) {
logger.error("No value specified for input {}", dpi.getParserInputInterfaceId());
return new TaskResult(TaskResult.Status.FAILED, "No value specified for input");
}
if (path.startsWith("$")) {
String derivedPath = getUserContent(path.substring(1), Scope.WORKFLOW);
if (derivedPath == null) {
logger.error("No value in context to path {} for {}", path, dpi.getParserInputInterfaceId());
return new TaskResult(TaskResult.Status.FAILED, "No value specified in context for path");
}
path = derivedPath;
}
try {
Files.copy(Paths.get(path), Paths.get(tempInputDir + File.separator + dpi.getInputName()));
logger.info("Copied input file from path {} to {}", path, tempInputDir + File.separator + dpi.getInputName());
} catch (IOException e) {
logger.error("Failed to copy the input from path {} to {}", path, tempInputDir);
return new TaskResult(TaskResult.Status.FAILED, "Failed to copy the input");
}
}
try {
runContainer(parser, tempInputDir, tempOutputDir, new HashMap<>());
exportOutputs(parser, tempOutputDir);
} catch (Exception e) {
Path dir = Paths.get(workingDirectory.get());
try {
if (!tempDataFile.get().isEmpty()) {
logger.info("Deleting resources : " + Paths.get(tempDataFile.get()));
Files.delete(Paths.get(tempDataFile.get()));
}
Files
.walk(dir) // Traverse the file tree in depth-first order
.sorted(Comparator.reverseOrder())
.forEach(path -> {
try {
logger.info("Deleting resources : " + path);
Files.delete(path); //delete each file or directory
} catch (IOException ex) {
logger.error("File cleanup failed for path " + dir, ex);
}
});
} catch (Exception ex) {
logger.error("File cleanup task failed " + dir, ex);
}
logger.error("Failed to execute the container for task {}", getTaskId());
return new TaskResult(TaskResult.Status.FAILED, "Failed to execute the container");
}
return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
}