public void submitDataParsingWorkflow()

in data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java [106:258]


    public void submitDataParsingWorkflow(WorkflowInvocationRequest request) throws Exception {

        WorkflowMessage workflowMessage = request.getMessage();

        for (String sourceResourceId : workflowMessage.getSourceResourceIdsList()) {
            logger.info("Processing parsing workflow for resource {}", sourceResourceId);

            FileMetadataResponse metadata;
            try (MFTApiClient mftClient = new MFTApiClient(mftHost, mftPort)) {
                MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClientStub = mftClient.get();

                DelegateAuth delegateAuth = DelegateAuth.newBuilder()
                        .setUserId(workflowMessage.getUsername())
                        .setClientId(mftClientId)
                        .setClientSecret(mftClientSecret)
                        .putProperties("TENANT_ID", workflowMessage.getTenantId()).build();

                metadata = mftClientStub.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
                        .setResourceType("SCP")
                        .setResourceId(sourceResourceId)
                        .setResourceToken(workflowMessage.getSourceCredentialToken())
                        .setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build()).build());
            }

            ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6566).usePlaintext().build();
            DataParserServiceGrpc.DataParserServiceBlockingStub parserClient = DataParserServiceGrpc.newBlockingStub(channel);

            ParsingJobListResponse parsingJobs = parserClient.listParsingJobs(ParsingJobListRequest.newBuilder().build());

            String tempDownloadPath = baseWorkingDir + UUID.randomUUID().toString();

            Map<String, StringMap> parserInputMappings = new HashMap<>();
            List<DataParsingJob> selectedPJs = parsingJobs.getParsersList().stream().filter(pj -> {
                List<DataParsingJobInput> pjis = pj.getDataParsingJobInputsList();

                boolean match = true;
                StringMap stringMap = new StringMap();
                for (DataParsingJobInput pji : pjis) {

                    ScriptEngine engine = new ScriptEngineManager().getEngineByName("JavaScript");
                    Bindings bindings = engine.getBindings(ScriptContext.ENGINE_SCOPE);
                    bindings.put("polyglot.js.allowHostAccess", true);
                    bindings.put("polyglot.js.allowHostClassLookup", (Predicate<String>) s -> true);
                    bindings.put("metadata", metadata);
                    try {
                        Boolean eval = (Boolean) engine.eval(pji.getSelectionQuery());
                        stringMap.put(pji.getDataParserInputInterfaceId(), tempDownloadPath);
                        match = match && eval;
                    } catch (ScriptException e) {
                        logger.error("Failed to evaluate parsing job {}", pj.getDataParsingJobId());
                        match = false;
                    }
                }

                if (match) {
                    parserInputMappings.put(pj.getParserId(), stringMap);
                }
                return match;
            }).collect(Collectors.toList());

            if (selectedPJs.isEmpty()) {
                logger.warn("No parsing jobs available for resource {} with path {}. So ignoring the workflow",
                        sourceResourceId, metadata.getResourcePath());
                continue;
            }

            Map<String, AbstractTask> taskMap = new HashMap<>();

            SyncLocalDataDownloadTask downloadTask = new SyncLocalDataDownloadTask();
            downloadTask.setTaskId("SLDT-" + UUID.randomUUID().toString());
            downloadTask.setMftClientId(mftClientId);
            downloadTask.setMftClientSecret(mftClientSecret);
            downloadTask.setUserId(workflowMessage.getUsername());
            downloadTask.setTenantId(workflowMessage.getTenantId());
            downloadTask.setMftHost(mftHost);
            downloadTask.setMftPort(mftPort);
            downloadTask.setSourceResourceId(sourceResourceId);
            downloadTask.setSourceCredToken(workflowMessage.getSourceCredentialToken());
            downloadTask.setDownloadPath(tempDownloadPath);

            taskMap.put(downloadTask.getTaskId(), downloadTask);

            DataParsingWorkflowResourceCleanUpTask cleanUpTask = new DataParsingWorkflowResourceCleanUpTask();
            cleanUpTask.setDownloadPath(tempDownloadPath);
            cleanUpTask.setTaskId("DPT-"+UUID.randomUUID().toString());
            taskMap.put(cleanUpTask.getTaskId(),cleanUpTask);

            for(String parserId: parserInputMappings.keySet()) {

                String parserWorkingDir = baseWorkingDir + UUID.randomUUID();
                GenericDataParsingTask dataParsingTask = new GenericDataParsingTask();
                dataParsingTask.setTaskId("DPT-" + UUID.randomUUID().toString());
                dataParsingTask.setParserId(parserId);
                dataParsingTask.setParserServiceHost(orchHost);
                dataParsingTask.setParserServicePort(orchPort);
                dataParsingTask.setInputMapping(parserInputMappings.get(parserId));
                dataParsingTask.setWorkingDirectory(parserWorkingDir);
                dataParsingTask.setTempDataFile(tempDownloadPath);
                taskMap.put(dataParsingTask.getTaskId(), dataParsingTask);

                cleanUpTask.setParsingDir(parserWorkingDir);


                OutPort outPort = new OutPort();
                outPort.setNextTaskId(dataParsingTask.getTaskId());
                downloadTask.addOutPort(outPort);

                DataParsingJob dataParsingJob = selectedPJs.stream().filter(pj -> pj.getParserId().equals(parserId)).findFirst().get();
                ParserFetchResponse parser = parserClient.fetchParser(ParserFetchRequest.newBuilder().setParserId(parserId).build());

                MetadataPersistTask finalTask = null;
                for (DataParserOutputInterface dataParserOutputInterface: parser.getParser().getOutputInterfacesList()) {

                    Optional<DataParsingJobOutput> dataParsingJobOutput = dataParsingJob.getDataParsingJobOutputsList().stream().filter(o ->
                            o.getDataParserOutputInterfaceId().equals(dataParserOutputInterface.getParserOutputInterfaceId()))
                            .findFirst();

                    if (dataParsingJobOutput.isPresent() && dataParsingJobOutput.get().getOutputType().equals("JSON")) {
                        MetadataPersistTask mpt = new MetadataPersistTask();
                        mpt.setTaskId("MPT-" + UUID.randomUUID().toString());
                        mpt.setDrmsHost(drmsHost);
                        mpt.setDrmsPort(drmsPort);
                        mpt.setTenant(workflowMessage.getTenantId());
                        mpt.setUser(workflowMessage.getUsername());
                        mpt.setServiceAccountKey(mftClientId);
                        mpt.setServiceAccountSecret(mftClientSecret);
                        mpt.setResourceId(sourceResourceId);
                        mpt.setJsonFile(parserWorkingDir +
                                File.separator + "outputs" + File.separator + dataParserOutputInterface.getOutputName());
                        OutPort dpOut = new OutPort();
                        dpOut.setNextTaskId(mpt.getTaskId());
                        dataParsingTask.addOutPort(dpOut);
                        taskMap.put(mpt.getTaskId(), mpt);
                        finalTask = mpt;
                    }
                }

                OutPort dpOut = new OutPort();
                dpOut.setNextTaskId(cleanUpTask.getTaskId());
                if (finalTask != null) {
                    finalTask.addOutPort(dpOut);
                }else {
                    dataParsingTask.addOutPort(dpOut);
                }
            }
            
            String[] startTaskIds = {downloadTask.getTaskId()};
            String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds);

            logger.info("Submitted workflow {} to parse resource {} with path {}", workflowId,
                    sourceResourceId, metadata.getResourcePath());
        }
    }