in data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java [360:423]
private void scanResourceForChildResources(GenericResource resourceObj, AuthToken mftAuth, AnyStoragePreference sourceSP,
String sourceStorageId, String sourceHostName, String adminUser,
List<String> resourceIDsToProcess, Map<String, GenericResource> resourceCache,
int scanDepth)
throws Exception {
FetchResourceMetadataRequest.Builder resourceMetadataReq = FetchResourceMetadataRequest.newBuilder()
.setMftAuthorizationToken(mftAuth)
.setResourceId(resourceObj.getResourceId());
switch (sourceSP.getStorageCase()) {
case SSH_STORAGE_PREFERENCE:
resourceMetadataReq.setResourceType("SCP");
resourceMetadataReq.setResourceToken(sourceSP.getSshStoragePreference().getStoragePreferenceId());
break;
case S3_STORAGE_PREFERENCE:
resourceMetadataReq.setResourceType("S3");
resourceMetadataReq.setResourceToken(sourceSP.getS3StoragePreference().getStoragePreferenceId());
break;
}
DirectoryMetadataResponse directoryResourceMetadata;
try (MFTApiClient mftApiClient = new MFTApiClient(
this.configuration.getOutboundEventProcessor().getMftHost(),
this.configuration.getOutboundEventProcessor().getMftPort())) {
MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClientStub = mftApiClient.get();
directoryResourceMetadata = mftClientStub.getDirectoryResourceMetadata(resourceMetadataReq.build());
} catch (Exception e) {
logger.error("Failed to fetch dir metadata for resource {} with path {}",
resourceObj.getResourceId(), resourceObj.getResourcePath(), e);
throw e;
}
for (FileMetadataResponse fileMetadata : directoryResourceMetadata.getFilesList()) {
logger.info("Registering file {} for source storage {}", fileMetadata.getResourcePath(), sourceStorageId);
long start = System.currentTimeMillis();
List<GenericResource> resourceList = createResourceWithParentDirectories(sourceHostName, sourceStorageId, notification.getBasePath(),
fileMetadata.getResourcePath(), "FILE", adminUser, resourceCache);
GenericResource fileResource = resourceList.get(resourceList.size() - 1);
resourceIDsToProcess.add(fileResource.getResourceId());
logger.info("Completed registering the file {} for source storage {}. Time taken {} ms",
fileMetadata.getResourcePath(), sourceStorageId, System.currentTimeMillis() - start);
}
for (DirectoryMetadataResponse directoryMetadata : directoryResourceMetadata.getDirectoriesList()) {
logger.info("Registering directory {} for source storage {}", directoryMetadata.getResourcePath(), sourceStorageId);
long start = System.currentTimeMillis();
List<GenericResource> createResources = createResourceWithParentDirectories(sourceHostName, sourceStorageId, notification.getBasePath(),
directoryMetadata.getResourcePath(),
"COLLECTION", adminUser, resourceCache);
GenericResource dirResource = createResources.get(createResources.size() - 1);
if (scanDepth > 0) {
// Scanning the directories recursively
scanResourceForChildResources(dirResource, mftAuth, sourceSP, sourceStorageId, sourceHostName, adminUser,
resourceIDsToProcess, resourceCache, scanDepth - 1);
}
logger.info("Completed registering directory {} for source storage {}. Time taken {} ms",
directoryMetadata.getResourcePath(), sourceStorageId, sourceStorageId, System.currentTimeMillis() - start);
}
}