in python-cli/mft_cli/airavata_mft_cli/operations.py [0:0]
def copy(source, destination):
source_storage_id, source_secret_id = fetch_storage_and_secret_ids(source.split("/")[0])
dest_storage_id, dest_secret_id = fetch_storage_and_secret_ids(destination.split("/")[0])
## TODO : Check agent availability and deploy cloud agents if required
file_list = []
source_metadata = get_resource_metadata(source)
endpoint_paths = []
total_volume = 0
transfer_request = MFTTransferApi_pb2.TransferApiRequest(sourceStorageId = source_storage_id,
sourceSecretId = source_secret_id,
destinationStorageId = dest_storage_id,
destinationSecretId = dest_secret_id,
optimizeTransferPath = False)
if (source_metadata.WhichOneof('metadata') == 'directory') :
if (destination[-1] != "/"):
print("Source is a directory path so destination path should end with /")
raise typer.Abort()
flatten_directories(source_metadata.directory, "", file_list)
for file_entry in file_list:
file = file_entry[0]
relative_path = file_entry[1]
endpoint_paths.append(MFTTransferApi_pb2.EndpointPaths(
sourcePath = file.resourcePath,
destinationPath = destination[len(destination.split("/")[0]) +1 :] + relative_path))
total_volume += file.resourceSize
elif (source_metadata.WhichOneof('metadata') == 'file'):
file_list.append((source_metadata.file, source_metadata.file.friendlyName))
if destination[-1] == "/":
destination = destination + source_metadata.file.friendlyName
endpoint_paths.append(MFTTransferApi_pb2.EndpointPaths(
sourcePath = source_metadata.file.resourcePath,
destinationPath = destination[len(destination.split("/")[0]) +1 :]))
total_volume += source_metadata.file.resourceSize
elif (source_metadata.WhichOneof('metadata') == 'error'):
print("Failed while fetching source details")
print(metadata_resp.error)
raise typer.Abort()
transfer_request.endpointPaths.extend(endpoint_paths)
confirm = typer.confirm("Total number of " + str(len(endpoint_paths)) +
" files to be transferred. Total volume is " + str(total_volume)
+ " bytes. Do you want to start the transfer? ", True)
if not confirm:
raise typer.Abort()
client = mft_client.MFTClient(transfer_api_port = configcli.transfer_api_port,
transfer_api_secured = configcli.transfer_api_secured,
resource_service_host = configcli.resource_service_host,
resource_service_port = configcli.resource_service_port,
resource_service_secured = configcli.resource_service_secured,
secret_service_host = configcli.secret_service_host,
secret_service_port = configcli.secret_service_port)
transfer_resp = client.transfer_api.submitTransfer(transfer_request)
transfer_id = transfer_resp.transferId
state_request = MFTTransferApi_pb2.TransferStateApiRequest(transferId=transfer_id)
## TODO: This has to be optimized and avoid frequent polling of all transfer ids in each iteration
## Possible fix is to introduce a parent batch transfer id at the API level and fetch child trnasfer id
# summaries in a single API call
completed = 0
failed = 0
progress_percentage = 0
with typer.progressbar(length=100) as progress:
while 1:
state_resp = client.transfer_api.getTransferStateSummary(state_request)
progress_percentage = int(state_resp.percentage * 100)
progress.update(progress_percentage - prev_percentage)
prev_percentage = progress_percentage
if (state_resp.percentage == 1.0):
completed = len(state_resp.completed)
failed = len(state_resp.failed)
break
if (state_resp.state == "FAILED"):
print("Transfer failed. Reason: " + state_resp.description)
raise typer.Abort()
time.sleep(1)
print(f"Processed {completed + failed} files. Completed {completed}, Failed {failed}.")