in priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java [170:238]
public AbstractBackupPath uploadAndDeleteInternal(
final AbstractBackupPath path, Instant target, int retry)
throws RejectedExecutionException, BackupRestoreException {
Path localPath = Paths.get(path.getBackupFile().getAbsolutePath());
File localFile = localPath.toFile();
Preconditions.checkArgument(
localFile.exists(), String.format("Can't upload nonexistent %s", localPath));
Preconditions.checkArgument(
!localFile.isDirectory(),
String.format("Can only upload files %s is a directory", localPath));
Path remotePath = Paths.get(path.getRemotePath());
if (tasksQueued.add(localPath)) {
logger.info("Uploading file: {} to location: {}", localPath, remotePath);
try {
long uploadedFileSize;
// Upload file if it not present at remote location.
if (path.getType() != BackupFileType.SST_V2 || !checkObjectExists(remotePath)) {
backupNotificationMgr.notify(path, UploadStatus.STARTED);
uploadedFileSize =
new BoundedExponentialRetryCallable<Long>(
500 /* minSleep */, 10000 /* maxSleep */, retry) {
@Override
public Long retriableCall() throws Exception {
return uploadFileImpl(path, target);
}
}.call();
// Add to cache after successful upload.
// We only add SST_V2 as other file types are usually not checked, so no point
// evicting our SST_V2 results.
if (path.getType() == BackupFileType.SST_V2) addObjectCache(remotePath);
backupMetrics.recordUploadRate(uploadedFileSize);
backupMetrics.incrementValidUploads();
path.setCompressedFileSize(uploadedFileSize);
backupNotificationMgr.notify(path, UploadStatus.SUCCESS);
} else {
// file is already uploaded to remote file system.
logger.info("File: {} already present on remoteFileSystem.", remotePath);
}
logger.info(
"Successfully uploaded file: {} to location: {}", localPath, remotePath);
if (!FileUtils.deleteQuietly(localFile))
logger.warn(
String.format(
"Failed to delete local file %s.",
localFile.getAbsolutePath()));
} catch (Exception e) {
backupMetrics.incrementInvalidUploads();
logger.error(
"Error while uploading file: {} to location: {}. Exception: Msg: [{}], Trace: {}",
localPath,
remotePath,
e.getMessage(),
e);
backupNotificationMgr.notify(path, UploadStatus.FAILED);
throw new BackupRestoreException(e.getMessage());
} finally {
// Remove the task from the list so if we try to upload file ever again, we can.
tasksQueued.remove(localPath);
}
} else logger.info("Already in queue, no-op. File: {}", localPath);
return path;
}