public void processTransfer()

in agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java [112:218]


    public void processTransfer(String transferId, String requestId, StorageWrapper sourceStorage, StorageWrapper destStorage,
                                SecretWrapper sourceSecret,SecretWrapper destSecret, EndpointPaths endpointPath,
                                TransportClassLoaderCache transportCache,
                                BiConsumer<EndpointPaths, TransferState> updateStatus,
                                BiConsumer<EndpointPaths, Boolean> createTransferHook) {
        try {

            long running = totalRunningTransfers.incrementAndGet();
            long pending = totalPendingTransfers.decrementAndGet();
            mftConsulClient.updateAgentPendingTransferCount(agentId, pending);

            logger.info("Received request {}. Total Running {}. Total Pending {}", transferId, running, pending);

            updateStatus.accept(endpointPath, new TransferState()
                    .setState("STARTING")
                    .setPercentage(0)
                    .setUpdateTimeMils(System.currentTimeMillis())
                    .setDescription("Starting the transfer"));

            Optional<MetadataCollector> srcMetadataCollectorOp = MetadataCollectorResolver
                    .resolveMetadataCollector(sourceStorage.getStorageCase().name(), transportCache);

            MetadataCollector srcMetadataCollector = srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for source"));
            srcMetadataCollector.init(sourceStorage, sourceSecret);

            ResourceMetadata srcMetadata = srcMetadataCollector.getResourceMetadata(endpointPath.getSourcePath(), false);
            if (srcMetadata.getMetadataCase() != ResourceMetadata.MetadataCase.FILE) {
                throw new Exception("Expected a file as the source but received " + srcMetadata.getMetadataCase().name());
            }

            Optional<MetadataCollector> dstMetadataCollectorOp = MetadataCollectorResolver
                    .resolveMetadataCollector(destStorage.getStorageCase().name(), transportCache);

            MetadataCollector dstMetadataCollector = dstMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for destination"));
            dstMetadataCollector.init(destStorage, destSecret);

            if (!overwriteExisting && dstMetadataCollector.isAvailable(endpointPath.getDestinationPath())) {
                ResourceMetadata destinationMetadata = dstMetadataCollector.getResourceMetadata(endpointPath.getDestinationPath(), false);
                if (destinationMetadata.getMetadataCase() == ResourceMetadata.MetadataCase.FILE &&
                        destinationMetadata.getFile().getResourceSize() == srcMetadata.getFile().getResourceSize()) {
                    logger.info("Ignoring the transfer of file {} as it is available in the destination", endpointPath.getSourcePath());
                    updateStatus.accept(endpointPath, new TransferState()
                            .setPercentage(100)
                            .setState("COMPLETED")
                            .setUpdateTimeMils(System.currentTimeMillis())
                            .setDescription("Ignoring transfer as the file is available in destination"));

                    return;
                }
            }

            ConnectorConfig srcCC = ConnectorConfig.ConnectorConfigBuilder.newBuilder()
                    .withTransferId(transferId)
                    .withSecret(sourceSecret)
                    .withStorage(sourceStorage)
                    .withResourcePath(endpointPath.getSourcePath())
                    .withChunkSize(chunkedSize)
                    .withTransportConfig(transportConfig.getTransport())
                    .withMetadata(srcMetadata).build();

            ConnectorConfig dstCC = ConnectorConfig.ConnectorConfigBuilder.newBuilder()
                    .withTransferId(transferId)
                    .withStorage(destStorage)
                    .withSecret(destSecret)
                    .withResourcePath(endpointPath.getDestinationPath())
                    .withChunkSize(chunkedSize)
                    .withTransportConfig(transportConfig.getTransport())
                    .withMetadata(srcMetadata).build();

            updateStatus.accept(endpointPath, new TransferState()
                    .setState("STARTED")
                    .setPercentage(0)
                    .setUpdateTimeMils(System.currentTimeMillis())
                    .setDescription("Started the transfer"));

            // Save transfer metadata in scheduled path to recover in case of an Agent failures. Recovery is done from controller
            createTransferHook.accept(endpointPath, true);

            mediator.transferSingleThread(transferId, srcCC, dstCC, transportCache, updateStatus,
                    (id, transferSuccess) -> {
                        try {
                            // Delete scheduled key as the transfer completed / failed if it was placed in current session
                            createTransferHook.accept(endpointPath,false);
                            long pendingAfter = totalRunningTransfers.decrementAndGet();
                            logger.info("Removed transfer {} from queue with transfer success = {}. Total running {}",
                                    id, transferSuccess, pendingAfter);
                        } catch (Exception e) {
                            logger.error("Failed while deleting scheduled path for transfer {}", id);
                        }
                    });


        } catch (Throwable e) {
            logger.error("Error in submitting transfer {}", transferId, e);

            updateStatus.accept(endpointPath, new TransferState()
                    .setState("FAILED")
                    .setPercentage(0)
                    .setUpdateTimeMils(System.currentTimeMillis())
                    .setDescription(ExceptionUtils.getStackTrace(e)));


        } finally {
            //logger.info("Deleting key " + consulEntryKey);
            //mftConsulClient.getKvClient().deleteKey(consulEntryKey); // Due to bug in consul https://github.com/hashicorp/consul/issues/571
        }
    }