in data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java [106:258]
public void submitDataParsingWorkflow(WorkflowInvocationRequest request) throws Exception {
WorkflowMessage workflowMessage = request.getMessage();
for (String sourceResourceId : workflowMessage.getSourceResourceIdsList()) {
logger.info("Processing parsing workflow for resource {}", sourceResourceId);
FileMetadataResponse metadata;
try (MFTApiClient mftClient = new MFTApiClient(mftHost, mftPort)) {
MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClientStub = mftClient.get();
DelegateAuth delegateAuth = DelegateAuth.newBuilder()
.setUserId(workflowMessage.getUsername())
.setClientId(mftClientId)
.setClientSecret(mftClientSecret)
.putProperties("TENANT_ID", workflowMessage.getTenantId()).build();
metadata = mftClientStub.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
.setResourceType("SCP")
.setResourceId(sourceResourceId)
.setResourceToken(workflowMessage.getSourceCredentialToken())
.setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build()).build());
}
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6566).usePlaintext().build();
DataParserServiceGrpc.DataParserServiceBlockingStub parserClient = DataParserServiceGrpc.newBlockingStub(channel);
ParsingJobListResponse parsingJobs = parserClient.listParsingJobs(ParsingJobListRequest.newBuilder().build());
String tempDownloadPath = baseWorkingDir + UUID.randomUUID().toString();
Map<String, StringMap> parserInputMappings = new HashMap<>();
List<DataParsingJob> selectedPJs = parsingJobs.getParsersList().stream().filter(pj -> {
List<DataParsingJobInput> pjis = pj.getDataParsingJobInputsList();
boolean match = true;
StringMap stringMap = new StringMap();
for (DataParsingJobInput pji : pjis) {
ScriptEngine engine = new ScriptEngineManager().getEngineByName("JavaScript");
Bindings bindings = engine.getBindings(ScriptContext.ENGINE_SCOPE);
bindings.put("polyglot.js.allowHostAccess", true);
bindings.put("polyglot.js.allowHostClassLookup", (Predicate<String>) s -> true);
bindings.put("metadata", metadata);
try {
Boolean eval = (Boolean) engine.eval(pji.getSelectionQuery());
stringMap.put(pji.getDataParserInputInterfaceId(), tempDownloadPath);
match = match && eval;
} catch (ScriptException e) {
logger.error("Failed to evaluate parsing job {}", pj.getDataParsingJobId());
match = false;
}
}
if (match) {
parserInputMappings.put(pj.getParserId(), stringMap);
}
return match;
}).collect(Collectors.toList());
if (selectedPJs.isEmpty()) {
logger.warn("No parsing jobs available for resource {} with path {}. So ignoring the workflow",
sourceResourceId, metadata.getResourcePath());
continue;
}
Map<String, AbstractTask> taskMap = new HashMap<>();
SyncLocalDataDownloadTask downloadTask = new SyncLocalDataDownloadTask();
downloadTask.setTaskId("SLDT-" + UUID.randomUUID().toString());
downloadTask.setMftClientId(mftClientId);
downloadTask.setMftClientSecret(mftClientSecret);
downloadTask.setUserId(workflowMessage.getUsername());
downloadTask.setTenantId(workflowMessage.getTenantId());
downloadTask.setMftHost(mftHost);
downloadTask.setMftPort(mftPort);
downloadTask.setSourceResourceId(sourceResourceId);
downloadTask.setSourceCredToken(workflowMessage.getSourceCredentialToken());
downloadTask.setDownloadPath(tempDownloadPath);
taskMap.put(downloadTask.getTaskId(), downloadTask);
DataParsingWorkflowResourceCleanUpTask cleanUpTask = new DataParsingWorkflowResourceCleanUpTask();
cleanUpTask.setDownloadPath(tempDownloadPath);
cleanUpTask.setTaskId("DPT-"+UUID.randomUUID().toString());
taskMap.put(cleanUpTask.getTaskId(),cleanUpTask);
for(String parserId: parserInputMappings.keySet()) {
String parserWorkingDir = baseWorkingDir + UUID.randomUUID();
GenericDataParsingTask dataParsingTask = new GenericDataParsingTask();
dataParsingTask.setTaskId("DPT-" + UUID.randomUUID().toString());
dataParsingTask.setParserId(parserId);
dataParsingTask.setParserServiceHost(orchHost);
dataParsingTask.setParserServicePort(orchPort);
dataParsingTask.setInputMapping(parserInputMappings.get(parserId));
dataParsingTask.setWorkingDirectory(parserWorkingDir);
dataParsingTask.setTempDataFile(tempDownloadPath);
taskMap.put(dataParsingTask.getTaskId(), dataParsingTask);
cleanUpTask.setParsingDir(parserWorkingDir);
OutPort outPort = new OutPort();
outPort.setNextTaskId(dataParsingTask.getTaskId());
downloadTask.addOutPort(outPort);
DataParsingJob dataParsingJob = selectedPJs.stream().filter(pj -> pj.getParserId().equals(parserId)).findFirst().get();
ParserFetchResponse parser = parserClient.fetchParser(ParserFetchRequest.newBuilder().setParserId(parserId).build());
MetadataPersistTask finalTask = null;
for (DataParserOutputInterface dataParserOutputInterface: parser.getParser().getOutputInterfacesList()) {
Optional<DataParsingJobOutput> dataParsingJobOutput = dataParsingJob.getDataParsingJobOutputsList().stream().filter(o ->
o.getDataParserOutputInterfaceId().equals(dataParserOutputInterface.getParserOutputInterfaceId()))
.findFirst();
if (dataParsingJobOutput.isPresent() && dataParsingJobOutput.get().getOutputType().equals("JSON")) {
MetadataPersistTask mpt = new MetadataPersistTask();
mpt.setTaskId("MPT-" + UUID.randomUUID().toString());
mpt.setDrmsHost(drmsHost);
mpt.setDrmsPort(drmsPort);
mpt.setTenant(workflowMessage.getTenantId());
mpt.setUser(workflowMessage.getUsername());
mpt.setServiceAccountKey(mftClientId);
mpt.setServiceAccountSecret(mftClientSecret);
mpt.setResourceId(sourceResourceId);
mpt.setJsonFile(parserWorkingDir +
File.separator + "outputs" + File.separator + dataParserOutputInterface.getOutputName());
OutPort dpOut = new OutPort();
dpOut.setNextTaskId(mpt.getTaskId());
dataParsingTask.addOutPort(dpOut);
taskMap.put(mpt.getTaskId(), mpt);
finalTask = mpt;
}
}
OutPort dpOut = new OutPort();
dpOut.setNextTaskId(cleanUpTask.getTaskId());
if (finalTask != null) {
finalTask.addOutPort(dpOut);
}else {
dataParsingTask.addOutPort(dpOut);
}
}
String[] startTaskIds = {downloadTask.getTaskId()};
String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);
logger.info("Submitted workflow {} to parse resource {} with path {}", workflowId,
sourceResourceId, metadata.getResourcePath());
}
}