public void run()

in data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java [178:358]


    public void run() {
        logger.info("Processing resource path {} on storage {}", notification.getResourcePath(),
                notification.getBasePath());

        long start = System.currentTimeMillis();

        Map<String, GenericResource> resourceCache = new HashMap<>();
        try {

            this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
                    .setStatus(NotificationStatus.newBuilder()
                            .setStatusId(UUID.randomUUID().toString())
                            .setNotificationId(notification.getNotificationId())
                            .setStatus(NotificationStatus.StatusType.DATA_ORCH_RECEIVED)
                            .setDescription("Notification Received")
                            .setPublishedTime(System.currentTimeMillis())
                            .build()).build());

            if (!"FOLDER".equals(notification.getResourceType())) {
                logger.error("Resource {} should be a Folder type but got {}",
                        notification.getResourcePath(),
                        notification.getResourceType());
                logger.error("Resource should be a Folder type");
                this.drmsConnector.createUnverifiedResource(notification.getAuthToken(), notification.getTenantId(),
                        notification.getNotificationId(), notification.getResourcePath(), notification.getResourceType(),
                        Constants.ERROR_CODE_INVALID_TYPE, " Invalid  type :" + notification.getResourceType(), null);
                throw new Exception("Resource should be a Folder type");

            }

            String removeBasePath = notification.getResourcePath().substring(notification.getBasePath().length());
            String[] splitted = removeBasePath.split("/");

            if (splitted.length < 2) {
                logger.error("Invalid path. Need at least two folder levels from base. {}", removeBasePath);
                this.drmsConnector.createUnverifiedResource(notification.getAuthToken(), notification.getTenantId(),
                        notification.getNotificationId(), notification.getResourcePath(), notification.getResourceType(),
                        Constants.ERROR_CODE_INVALID_PATH, " Invalid  path ", null);
                throw new Exception("Invalid path. Need at least two folder levels from base");
            }
            String adminUser = null;
            String owner = null;

            Optional<String> adminUserOp = verifyUser(splitted[0]);
            Optional<String> ownerOp = verifyUser(splitted[1].split("_")[0]);
            if (adminUserOp.isEmpty()) {
                this.drmsConnector.createUnverifiedResource(notification.getAuthToken(), notification.getTenantId(),
                        notification.getNotificationId(), notification.getResourcePath(), notification.getResourceType(),
                        Constants.ERROR_CODE_INVALID_USERNAME, " User not verified ", splitted[0]);
                logger.error("Invalid user. User should be verified users {} . {}", splitted[0], removeBasePath);
                throw new Exception("Invalid user. User " + splitted[0] + " should be registered users");
            }

            if (ownerOp.isEmpty()) {
                this.drmsConnector.createUnverifiedResource(notification.getAuthToken(), notification.getTenantId(),
                        notification.getNotificationId(), notification.getResourcePath(), notification.getResourceType(),
                        Constants.ERROR_CODE_INVALID_USERNAME, " User not verified ", splitted[1]);
                logger.error("Invalid user. User should be verified users {} . {}", splitted[1], removeBasePath);
                throw new Exception("Invalid user. User " + splitted[1] + " should be registered users");
            }
            
            adminUser = adminUserOp.get();
            owner = ownerOp.get();

            Map<String, String> ownerRules = new HashMap<>();
            ownerRules.put(adminUser, "VIEWER");
            ownerRules.put(splitted[1], "OWNER");

            Optional<TransferMapping> optionalTransferMapping = drmsConnector.getActiveTransferMapping(
                    notification.getAuthToken(),
                    notification.getTenantId(), adminUser,
                    notification.getHostName());

            if (optionalTransferMapping.isEmpty()) {
                logger.error("Could not find a transfer mapping for user {} and host {}", adminUser, notification.getHostName());
                throw new Exception("Could not find a transfer mapping");
            }

            TransferMapping transferMapping = optionalTransferMapping.get();

            String sourceStorageId = transferMapping.getSourceStorage().getSshStorage().getStorageId();
            String sourceHostName = transferMapping.getSourceStorage().getSshStorage().getHostName();
            String destinationStorageId = transferMapping.getDestinationStorage().getSshStorage().getStorageId();
            String destinationHostName = transferMapping.getDestinationStorage().getSshStorage().getHostName();

            // Creating parent resource

            List<GenericResource> resourceList = createResourceWithParentDirectories(sourceHostName, sourceStorageId,
                    notification.getBasePath(),
                    notification.getResourcePath(),
                    "COLLECTION", adminUser, resourceCache);

            shareResourcesWithUsers(Collections.singletonList(resourceList.get(resourceList.size() - 1)),
                    adminUser, owner, "VIEWER");

            shareResourcesWithGroups(Collections.singletonList(resourceList.get(0)), adminUser,
                    configuration.getTenantConfigs().getAdminGroup(),
                    "EDITOR");

            GenericResource resourceObj = resourceList.get(resourceList.size() - 1);

            Optional<AnyStoragePreference> sourceSPOp = this.drmsConnector.getStoragePreference(
                    notification.getAuthToken(), adminUser,
                    notification.getTenantId(), sourceStorageId);

            if (sourceSPOp.isEmpty()) {
                logger.error("No storage preference found for source storage {} and user {}", sourceStorageId, adminUser);
                throw new Exception("No storage preference found for source storage");
            }

            Optional<AnyStoragePreference> destSPOp = this.drmsConnector.getStoragePreference(
                    notification.getAuthToken(), adminUser,
                    notification.getTenantId(), destinationStorageId);

            if (destSPOp.isEmpty()) {
                logger.error("No storage preference found for destination storage {} and user {}", sourceStorageId, adminUser);
                throw new Exception("No storage preference found for destination storage");
            }

            AnyStoragePreference sourceSP = sourceSPOp.get();
            AnyStoragePreference destSP = destSPOp.get();

            String decodedAuth = new String(Base64.getDecoder().decode(notification.getAuthToken()));
            String[] authParts = decodedAuth.split(":");

            if (authParts.length != 2) {
                throw new Exception("Could not decode auth token to work with MFT");
            }

            DelegateAuth delegateAuth = DelegateAuth.newBuilder()
                    .setUserId(adminUser)
                    .setClientId(authParts[0])
                    .setClientSecret(authParts[1])
                    .putProperties("TENANT_ID", notification.getTenantId()).build();

            AuthToken mftAuth = AuthToken.newBuilder().setDelegateAuth(delegateAuth).build();
            List<String> resourceIDsToProcess = new ArrayList<>();

            // Fetching file list for parent resource
            scanResourceForChildResources(resourceObj, mftAuth, sourceSP, sourceStorageId, sourceHostName,
                    adminUser, resourceIDsToProcess, resourceCache, 4);

            logger.info("Creating destination zip resource for directory {}", notification.getResourcePath());
            resourceList = createResourceWithParentDirectories(destinationHostName, destinationStorageId, notification.getBasePath(),
                    notification.getResourcePath(), "FILE", adminUser, resourceCache);

            GenericResource destinationResource = resourceList.get(resourceList.size() - 1);

            logger.info("Submitting resources to workflow manager");
            this.workflowServiceConnector.invokeWorkflow(notification.getAuthToken(), adminUser,
                    notification.getTenantId(), resourceIDsToProcess, sourceSP.getSshStoragePreference().getStoragePreferenceId(),
                    destinationResource.getResourceId(), destSP.getSshStoragePreference().getStoragePreferenceId());


            this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
                    .setStatus(NotificationStatus.newBuilder()
                            .setStatusId(UUID.randomUUID().toString())
                            .setNotificationId(notification.getNotificationId())
                            .setStatus(NotificationStatus.StatusType.DISPATCHED_TO_WORFLOW_ENGING)
                            .setDescription("Notification successfully processed at the orchestrator. " +
                                    "Sending to workflow manager")
                            .setPublishedTime(System.currentTimeMillis())
                            .build()).build());

            logger.info("Completed processing path {}. Time taken {} ms",
                    notification.getResourcePath(), System.currentTimeMillis() - start);

        } catch (Exception e) {
            logger.error("Failed to process event for resource path {}", notification.getResourcePath(), e);
            this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
                    .setStatus(NotificationStatus.newBuilder()
                            .setStatusId(UUID.randomUUID().toString())
                            .setNotificationId(notification.getNotificationId())
                            .setStatus(NotificationStatus.StatusType.ERRORED)
                            .setDescription("Notification failed due to : " + e.getMessage())
                            .setPublishedTime(System.currentTimeMillis())
                            .build()).build());
        } finally {
            this.eventCache.remove(notification.getResourcePath() + ":" + notification.getHostName());
        }
    }