in data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java [178:358]
public void run() {
logger.info("Processing resource path {} on storage {}", notification.getResourcePath(),
notification.getBasePath());
long start = System.currentTimeMillis();
Map<String, GenericResource> resourceCache = new HashMap<>();
try {
this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
.setStatus(NotificationStatus.newBuilder()
.setStatusId(UUID.randomUUID().toString())
.setNotificationId(notification.getNotificationId())
.setStatus(NotificationStatus.StatusType.DATA_ORCH_RECEIVED)
.setDescription("Notification Received")
.setPublishedTime(System.currentTimeMillis())
.build()).build());
if (!"FOLDER".equals(notification.getResourceType())) {
logger.error("Resource {} should be a Folder type but got {}",
notification.getResourcePath(),
notification.getResourceType());
logger.error("Resource should be a Folder type");
this.drmsConnector.createUnverifiedResource(notification.getAuthToken(), notification.getTenantId(),
notification.getNotificationId(), notification.getResourcePath(), notification.getResourceType(),
Constants.ERROR_CODE_INVALID_TYPE, " Invalid type :" + notification.getResourceType(), null);
throw new Exception("Resource should be a Folder type");
}
String removeBasePath = notification.getResourcePath().substring(notification.getBasePath().length());
String[] splitted = removeBasePath.split("/");
if (splitted.length < 2) {
logger.error("Invalid path. Need at least two folder levels from base. {}", removeBasePath);
this.drmsConnector.createUnverifiedResource(notification.getAuthToken(), notification.getTenantId(),
notification.getNotificationId(), notification.getResourcePath(), notification.getResourceType(),
Constants.ERROR_CODE_INVALID_PATH, " Invalid path ", null);
throw new Exception("Invalid path. Need at least two folder levels from base");
}
String adminUser = null;
String owner = null;
Optional<String> adminUserOp = verifyUser(splitted[0]);
Optional<String> ownerOp = verifyUser(splitted[1].split("_")[0]);
if (adminUserOp.isEmpty()) {
this.drmsConnector.createUnverifiedResource(notification.getAuthToken(), notification.getTenantId(),
notification.getNotificationId(), notification.getResourcePath(), notification.getResourceType(),
Constants.ERROR_CODE_INVALID_USERNAME, " User not verified ", splitted[0]);
logger.error("Invalid user. User should be verified users {} . {}", splitted[0], removeBasePath);
throw new Exception("Invalid user. User " + splitted[0] + " should be registered users");
}
if (ownerOp.isEmpty()) {
this.drmsConnector.createUnverifiedResource(notification.getAuthToken(), notification.getTenantId(),
notification.getNotificationId(), notification.getResourcePath(), notification.getResourceType(),
Constants.ERROR_CODE_INVALID_USERNAME, " User not verified ", splitted[1]);
logger.error("Invalid user. User should be verified users {} . {}", splitted[1], removeBasePath);
throw new Exception("Invalid user. User " + splitted[1] + " should be registered users");
}
adminUser = adminUserOp.get();
owner = ownerOp.get();
Map<String, String> ownerRules = new HashMap<>();
ownerRules.put(adminUser, "VIEWER");
ownerRules.put(splitted[1], "OWNER");
Optional<TransferMapping> optionalTransferMapping = drmsConnector.getActiveTransferMapping(
notification.getAuthToken(),
notification.getTenantId(), adminUser,
notification.getHostName());
if (optionalTransferMapping.isEmpty()) {
logger.error("Could not find a transfer mapping for user {} and host {}", adminUser, notification.getHostName());
throw new Exception("Could not find a transfer mapping");
}
TransferMapping transferMapping = optionalTransferMapping.get();
String sourceStorageId = transferMapping.getSourceStorage().getSshStorage().getStorageId();
String sourceHostName = transferMapping.getSourceStorage().getSshStorage().getHostName();
String destinationStorageId = transferMapping.getDestinationStorage().getSshStorage().getStorageId();
String destinationHostName = transferMapping.getDestinationStorage().getSshStorage().getHostName();
// Creating parent resource
List<GenericResource> resourceList = createResourceWithParentDirectories(sourceHostName, sourceStorageId,
notification.getBasePath(),
notification.getResourcePath(),
"COLLECTION", adminUser, resourceCache);
shareResourcesWithUsers(Collections.singletonList(resourceList.get(resourceList.size() - 1)),
adminUser, owner, "VIEWER");
shareResourcesWithGroups(Collections.singletonList(resourceList.get(0)), adminUser,
configuration.getTenantConfigs().getAdminGroup(),
"EDITOR");
GenericResource resourceObj = resourceList.get(resourceList.size() - 1);
Optional<AnyStoragePreference> sourceSPOp = this.drmsConnector.getStoragePreference(
notification.getAuthToken(), adminUser,
notification.getTenantId(), sourceStorageId);
if (sourceSPOp.isEmpty()) {
logger.error("No storage preference found for source storage {} and user {}", sourceStorageId, adminUser);
throw new Exception("No storage preference found for source storage");
}
Optional<AnyStoragePreference> destSPOp = this.drmsConnector.getStoragePreference(
notification.getAuthToken(), adminUser,
notification.getTenantId(), destinationStorageId);
if (destSPOp.isEmpty()) {
logger.error("No storage preference found for destination storage {} and user {}", sourceStorageId, adminUser);
throw new Exception("No storage preference found for destination storage");
}
AnyStoragePreference sourceSP = sourceSPOp.get();
AnyStoragePreference destSP = destSPOp.get();
String decodedAuth = new String(Base64.getDecoder().decode(notification.getAuthToken()));
String[] authParts = decodedAuth.split(":");
if (authParts.length != 2) {
throw new Exception("Could not decode auth token to work with MFT");
}
DelegateAuth delegateAuth = DelegateAuth.newBuilder()
.setUserId(adminUser)
.setClientId(authParts[0])
.setClientSecret(authParts[1])
.putProperties("TENANT_ID", notification.getTenantId()).build();
AuthToken mftAuth = AuthToken.newBuilder().setDelegateAuth(delegateAuth).build();
List<String> resourceIDsToProcess = new ArrayList<>();
// Fetching file list for parent resource
scanResourceForChildResources(resourceObj, mftAuth, sourceSP, sourceStorageId, sourceHostName,
adminUser, resourceIDsToProcess, resourceCache, 4);
logger.info("Creating destination zip resource for directory {}", notification.getResourcePath());
resourceList = createResourceWithParentDirectories(destinationHostName, destinationStorageId, notification.getBasePath(),
notification.getResourcePath(), "FILE", adminUser, resourceCache);
GenericResource destinationResource = resourceList.get(resourceList.size() - 1);
logger.info("Submitting resources to workflow manager");
this.workflowServiceConnector.invokeWorkflow(notification.getAuthToken(), adminUser,
notification.getTenantId(), resourceIDsToProcess, sourceSP.getSshStoragePreference().getStoragePreferenceId(),
destinationResource.getResourceId(), destSP.getSshStoragePreference().getStoragePreferenceId());
this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
.setStatus(NotificationStatus.newBuilder()
.setStatusId(UUID.randomUUID().toString())
.setNotificationId(notification.getNotificationId())
.setStatus(NotificationStatus.StatusType.DISPATCHED_TO_WORFLOW_ENGING)
.setDescription("Notification successfully processed at the orchestrator. " +
"Sending to workflow manager")
.setPublishedTime(System.currentTimeMillis())
.build()).build());
logger.info("Completed processing path {}. Time taken {} ms",
notification.getResourcePath(), System.currentTimeMillis() - start);
} catch (Exception e) {
logger.error("Failed to process event for resource path {}", notification.getResourcePath(), e);
this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
.setStatus(NotificationStatus.newBuilder()
.setStatusId(UUID.randomUUID().toString())
.setNotificationId(notification.getNotificationId())
.setStatus(NotificationStatus.StatusType.ERRORED)
.setDescription("Notification failed due to : " + e.getMessage())
.setPublishedTime(System.currentTimeMillis())
.build()).build());
} finally {
this.eventCache.remove(notification.getResourcePath() + ":" + notification.getHostName());
}
}