public void transferSingleThread()

in agent/service/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java [66:248]


    public void transferSingleThread(String transferId,
                                     ConnectorConfig srcCC,
                                     ConnectorConfig dstCC,
                                     TransportClassLoaderCache transportCache,
                                     BiConsumer<EndpointPaths, TransferState> onStatusCallback,
                                     BiConsumer<String, Boolean> exitingCallback) {

        final AtomicBoolean transferInProgress = new AtomicBoolean(true);

        EndpointPaths endpointPath = EndpointPaths.newBuilder()
                .setSourcePath(srcCC.getResourcePath())
                .setDestinationPath(dstCC.getResourcePath()).build();
        try {

            logger.info("Stating transfer {}", transferId);

            Optional<IncomingStreamingConnector> inStreamingConnectorOp = ConnectorResolver
                    .resolveIncomingStreamingConnector(srcCC.getStorage().getStorageCase().name(), transportCache);
            Optional<OutgoingStreamingConnector> outStreamingConnectorOp = ConnectorResolver
                    .resolveOutgoingStreamingConnector(dstCC.getStorage().getStorageCase().name(), transportCache);

            Optional<IncomingChunkedConnector> inChunkedConnectorOp = ConnectorResolver
                    .resolveIncomingChunkedConnector(srcCC.getStorage().getStorageCase().name(), transportCache);
            Optional<OutgoingChunkedConnector> outChunkedConnectorOp = ConnectorResolver
                    .resolveOutgoingChunkedConnector(dstCC.getStorage().getStorageCase().name(), transportCache);



            onStatusCallback.accept(endpointPath, new TransferState()
                    .setPercentage(0)
                    .setState("RUNNING")
                    .setUpdateTimeMils(System.currentTimeMillis())
                    .setDescription("Transfer is ongoing"));

            long start = System.currentTimeMillis();

            // Give priority for chunked transfers.
            // TODO: Provide a preference at the API level
            if (inChunkedConnectorOp.isPresent() && outChunkedConnectorOp.isPresent()) {

                logger.info("Starting the chunked transfer for transfer {}", transferId);

                long chunkSize = chunkedSize * 1024 * 1024L;

                CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(chunkedExecutorService);

                long fileLength = srcCC.getMetadata().getFile().getResourceSize();
                long uploadLength = 0L;
                int chunkIdx = 0;

                IncomingChunkedConnector inConnector = inChunkedConnectorOp
                        .orElseThrow(() -> new Exception("Could not find an in chunked connector for type " + srcCC.getStorage().getStorageCase().name()));

                OutgoingChunkedConnector outConnector = outChunkedConnectorOp
                        .orElseThrow(() -> new Exception("Could not find an out chunked connector for type " + dstCC.getStorage().getStorageCase().name()));

                inConnector.init(srcCC);
                outConnector.init(dstCC);

                try {
                    while (uploadLength < fileLength) {

                        long endPos = uploadLength + chunkSize;
                        if (endPos > fileLength) {
                            endPos = fileLength;
                        }


                        completionService.submit(new ChunkMover(inConnector,
                                outConnector, uploadLength, endPos, chunkIdx,
                                transferId, doChunkStreaming));

                        uploadLength = endPos;
                        chunkIdx++;
                    }


                    for (int i = 0; i < chunkIdx; i++) {
                        Future<Integer> future = completionService.take();
                        future.get();
                    }

                    inConnector.complete();
                    outConnector.complete();
                    logger.info("Completed chunked transfer for transfer {}", transferId);

                } catch (Exception e) {
                    inConnector.failed();
                    outConnector.failed();
                    throw e;
                }
            } else if (inStreamingConnectorOp.isPresent() && outStreamingConnectorOp.isPresent()) {

                logger.info("Starting streaming transfer for transfer {}", transferId);
                IncomingStreamingConnector inConnector = inStreamingConnectorOp
                        .orElseThrow(() -> new Exception("Could not find an in streaming connector for type " + srcCC.getStorage().getStorageCase().name()));

                OutgoingStreamingConnector outConnector = outStreamingConnectorOp
                        .orElseThrow(() -> new Exception("Could not find an out streaming connector for type " + dstCC.getStorage().getStorageCase().name()));

                inConnector.init(srcCC);
                outConnector.init(dstCC);

                try {

                    InputStream inputStream = inConnector.fetchInputStream();
                    OutputStream outputStream = outConnector.fetchOutputStream();

                    long count = 0;
                    final AtomicLong countAtomic = new AtomicLong();
                    countAtomic.set(count);

                    monitorPool.submit(() -> {
                        while (true) {
                            try {
                                Thread.sleep(2000);
                            } catch (InterruptedException e) {
                                // Ignore
                            }
                            if (!transferInProgress.get()) {
                                logger.info("Status monitor is exiting for transfer {}", transferId);
                                break;
                            }
                            double transferPercentage = countAtomic.get() * 100.0 / srcCC.getMetadata().getFile().getResourceSize();
                            logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
                            onStatusCallback.accept(endpointPath, new TransferState()
                                    .setPercentage(transferPercentage)
                                    .setState("RUNNING")
                                    .setUpdateTimeMils(System.currentTimeMillis())
                                    .setDescription("Transfer Progress Updated"));
                        }
                    });

                    int n;
                    byte[] buffer = new byte[128 * 1024];
                    for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
                        outputStream.write(buffer, 0, n);
                        countAtomic.set(count);
                    }

                    inConnector.complete();
                    outConnector.complete();

                    logger.info("Completed streaming transfer for transfer {}", transferId);
                } catch (Exception e) {
                    inConnector.failed();
                    outConnector.failed();
                    throw e;
                }

            } else {
                throw new Exception("No matching connector found to perform the transfer");
            }

            long endTime = System.currentTimeMillis();

            double time = (endTime - start) / 1000.0;

            logger.info("Transfer {} completed. Time {} S.  Speed {} MB/s", transferId, time,
                    (srcCC.getMetadata().getFile().getResourceSize() * 1.0 / time) / (1024 * 1024));

            onStatusCallback.accept(endpointPath, new TransferState()
                    .setPercentage(100)
                    .setState("COMPLETED")
                    .setUpdateTimeMils(endTime)
                    .setDescription("Transfer successfully completed"));

            exitingCallback.accept(transferId, true);
        } catch (Exception e) {

            logger.error("Transfer {} failed with error", transferId, e);

            onStatusCallback.accept(endpointPath, new TransferState()
                    .setPercentage(0)
                    .setState("FAILED")
                    .setUpdateTimeMils(System.currentTimeMillis())
                    .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
            exitingCallback.accept(transferId, false);
        } finally {
            transferInProgress.set(false);
        }

    }