in agent/service/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java [66:248]
public void transferSingleThread(String transferId,
ConnectorConfig srcCC,
ConnectorConfig dstCC,
TransportClassLoaderCache transportCache,
BiConsumer<EndpointPaths, TransferState> onStatusCallback,
BiConsumer<String, Boolean> exitingCallback) {
final AtomicBoolean transferInProgress = new AtomicBoolean(true);
EndpointPaths endpointPath = EndpointPaths.newBuilder()
.setSourcePath(srcCC.getResourcePath())
.setDestinationPath(dstCC.getResourcePath()).build();
try {
logger.info("Stating transfer {}", transferId);
Optional<IncomingStreamingConnector> inStreamingConnectorOp = ConnectorResolver
.resolveIncomingStreamingConnector(srcCC.getStorage().getStorageCase().name(), transportCache);
Optional<OutgoingStreamingConnector> outStreamingConnectorOp = ConnectorResolver
.resolveOutgoingStreamingConnector(dstCC.getStorage().getStorageCase().name(), transportCache);
Optional<IncomingChunkedConnector> inChunkedConnectorOp = ConnectorResolver
.resolveIncomingChunkedConnector(srcCC.getStorage().getStorageCase().name(), transportCache);
Optional<OutgoingChunkedConnector> outChunkedConnectorOp = ConnectorResolver
.resolveOutgoingChunkedConnector(dstCC.getStorage().getStorageCase().name(), transportCache);
onStatusCallback.accept(endpointPath, new TransferState()
.setPercentage(0)
.setState("RUNNING")
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Transfer is ongoing"));
long start = System.currentTimeMillis();
// Give priority for chunked transfers.
// TODO: Provide a preference at the API level
if (inChunkedConnectorOp.isPresent() && outChunkedConnectorOp.isPresent()) {
logger.info("Starting the chunked transfer for transfer {}", transferId);
long chunkSize = chunkedSize * 1024 * 1024L;
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(chunkedExecutorService);
long fileLength = srcCC.getMetadata().getFile().getResourceSize();
long uploadLength = 0L;
int chunkIdx = 0;
IncomingChunkedConnector inConnector = inChunkedConnectorOp
.orElseThrow(() -> new Exception("Could not find an in chunked connector for type " + srcCC.getStorage().getStorageCase().name()));
OutgoingChunkedConnector outConnector = outChunkedConnectorOp
.orElseThrow(() -> new Exception("Could not find an out chunked connector for type " + dstCC.getStorage().getStorageCase().name()));
inConnector.init(srcCC);
outConnector.init(dstCC);
try {
while (uploadLength < fileLength) {
long endPos = uploadLength + chunkSize;
if (endPos > fileLength) {
endPos = fileLength;
}
completionService.submit(new ChunkMover(inConnector,
outConnector, uploadLength, endPos, chunkIdx,
transferId, doChunkStreaming));
uploadLength = endPos;
chunkIdx++;
}
for (int i = 0; i < chunkIdx; i++) {
Future<Integer> future = completionService.take();
future.get();
}
inConnector.complete();
outConnector.complete();
logger.info("Completed chunked transfer for transfer {}", transferId);
} catch (Exception e) {
inConnector.failed();
outConnector.failed();
throw e;
}
} else if (inStreamingConnectorOp.isPresent() && outStreamingConnectorOp.isPresent()) {
logger.info("Starting streaming transfer for transfer {}", transferId);
IncomingStreamingConnector inConnector = inStreamingConnectorOp
.orElseThrow(() -> new Exception("Could not find an in streaming connector for type " + srcCC.getStorage().getStorageCase().name()));
OutgoingStreamingConnector outConnector = outStreamingConnectorOp
.orElseThrow(() -> new Exception("Could not find an out streaming connector for type " + dstCC.getStorage().getStorageCase().name()));
inConnector.init(srcCC);
outConnector.init(dstCC);
try {
InputStream inputStream = inConnector.fetchInputStream();
OutputStream outputStream = outConnector.fetchOutputStream();
long count = 0;
final AtomicLong countAtomic = new AtomicLong();
countAtomic.set(count);
monitorPool.submit(() -> {
while (true) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// Ignore
}
if (!transferInProgress.get()) {
logger.info("Status monitor is exiting for transfer {}", transferId);
break;
}
double transferPercentage = countAtomic.get() * 100.0 / srcCC.getMetadata().getFile().getResourceSize();
logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
onStatusCallback.accept(endpointPath, new TransferState()
.setPercentage(transferPercentage)
.setState("RUNNING")
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Transfer Progress Updated"));
}
});
int n;
byte[] buffer = new byte[128 * 1024];
for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
outputStream.write(buffer, 0, n);
countAtomic.set(count);
}
inConnector.complete();
outConnector.complete();
logger.info("Completed streaming transfer for transfer {}", transferId);
} catch (Exception e) {
inConnector.failed();
outConnector.failed();
throw e;
}
} else {
throw new Exception("No matching connector found to perform the transfer");
}
long endTime = System.currentTimeMillis();
double time = (endTime - start) / 1000.0;
logger.info("Transfer {} completed. Time {} S. Speed {} MB/s", transferId, time,
(srcCC.getMetadata().getFile().getResourceSize() * 1.0 / time) / (1024 * 1024));
onStatusCallback.accept(endpointPath, new TransferState()
.setPercentage(100)
.setState("COMPLETED")
.setUpdateTimeMils(endTime)
.setDescription("Transfer successfully completed"));
exitingCallback.accept(transferId, true);
} catch (Exception e) {
logger.error("Transfer {} failed with error", transferId, e);
onStatusCallback.accept(endpointPath, new TransferState()
.setPercentage(0)
.setState("FAILED")
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
exitingCallback.accept(transferId, false);
} finally {
transferInProgress.set(false);
}
}