in agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java [112:218]
public void processTransfer(String transferId, String requestId, StorageWrapper sourceStorage, StorageWrapper destStorage,
SecretWrapper sourceSecret,SecretWrapper destSecret, EndpointPaths endpointPath,
TransportClassLoaderCache transportCache,
BiConsumer<EndpointPaths, TransferState> updateStatus,
BiConsumer<EndpointPaths, Boolean> createTransferHook) {
try {
long running = totalRunningTransfers.incrementAndGet();
long pending = totalPendingTransfers.decrementAndGet();
mftConsulClient.updateAgentPendingTransferCount(agentId, pending);
logger.info("Received request {}. Total Running {}. Total Pending {}", transferId, running, pending);
updateStatus.accept(endpointPath, new TransferState()
.setState("STARTING")
.setPercentage(0)
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Starting the transfer"));
Optional<MetadataCollector> srcMetadataCollectorOp = MetadataCollectorResolver
.resolveMetadataCollector(sourceStorage.getStorageCase().name(), transportCache);
MetadataCollector srcMetadataCollector = srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for source"));
srcMetadataCollector.init(sourceStorage, sourceSecret);
ResourceMetadata srcMetadata = srcMetadataCollector.getResourceMetadata(endpointPath.getSourcePath(), false);
if (srcMetadata.getMetadataCase() != ResourceMetadata.MetadataCase.FILE) {
throw new Exception("Expected a file as the source but received " + srcMetadata.getMetadataCase().name());
}
Optional<MetadataCollector> dstMetadataCollectorOp = MetadataCollectorResolver
.resolveMetadataCollector(destStorage.getStorageCase().name(), transportCache);
MetadataCollector dstMetadataCollector = dstMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for destination"));
dstMetadataCollector.init(destStorage, destSecret);
if (!overwriteExisting && dstMetadataCollector.isAvailable(endpointPath.getDestinationPath())) {
ResourceMetadata destinationMetadata = dstMetadataCollector.getResourceMetadata(endpointPath.getDestinationPath(), false);
if (destinationMetadata.getMetadataCase() == ResourceMetadata.MetadataCase.FILE &&
destinationMetadata.getFile().getResourceSize() == srcMetadata.getFile().getResourceSize()) {
logger.info("Ignoring the transfer of file {} as it is available in the destination", endpointPath.getSourcePath());
updateStatus.accept(endpointPath, new TransferState()
.setPercentage(100)
.setState("COMPLETED")
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Ignoring transfer as the file is available in destination"));
return;
}
}
ConnectorConfig srcCC = ConnectorConfig.ConnectorConfigBuilder.newBuilder()
.withTransferId(transferId)
.withSecret(sourceSecret)
.withStorage(sourceStorage)
.withResourcePath(endpointPath.getSourcePath())
.withChunkSize(chunkedSize)
.withTransportConfig(transportConfig.getTransport())
.withMetadata(srcMetadata).build();
ConnectorConfig dstCC = ConnectorConfig.ConnectorConfigBuilder.newBuilder()
.withTransferId(transferId)
.withStorage(destStorage)
.withSecret(destSecret)
.withResourcePath(endpointPath.getDestinationPath())
.withChunkSize(chunkedSize)
.withTransportConfig(transportConfig.getTransport())
.withMetadata(srcMetadata).build();
updateStatus.accept(endpointPath, new TransferState()
.setState("STARTED")
.setPercentage(0)
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Started the transfer"));
// Save transfer metadata in scheduled path to recover in case of an Agent failures. Recovery is done from controller
createTransferHook.accept(endpointPath, true);
mediator.transferSingleThread(transferId, srcCC, dstCC, transportCache, updateStatus,
(id, transferSuccess) -> {
try {
// Delete scheduled key as the transfer completed / failed if it was placed in current session
createTransferHook.accept(endpointPath,false);
long pendingAfter = totalRunningTransfers.decrementAndGet();
logger.info("Removed transfer {} from queue with transfer success = {}. Total running {}",
id, transferSuccess, pendingAfter);
} catch (Exception e) {
logger.error("Failed while deleting scheduled path for transfer {}", id);
}
});
} catch (Throwable e) {
logger.error("Error in submitting transfer {}", transferId, e);
updateStatus.accept(endpointPath, new TransferState()
.setState("FAILED")
.setPercentage(0)
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription(ExceptionUtils.getStackTrace(e)));
} finally {
//logger.info("Deleting key " + consulEntryKey);
//mftConsulClient.getKvClient().deleteKey(consulEntryKey); // Due to bug in consul https://github.com/hashicorp/consul/issues/571
}
}