in internal/backint/restore/restore.go [139:212]
func restoreFile(ctx context.Context, config *bpb.BackintConfiguration, connectParams *storage.ConnectParameters, bucketHandle *store.BucketHandle, copier storage.IOFileCopier, fileName, destName, externalBackupID string, cloudProps *ipb.CloudProperties) []byte {
prefix := parse.CreateObjectPath(config, parse.TrimAndClean(fileName), "", "")
if externalBackupID != "" {
prefix += fmt.Sprintf("%s.bak", externalBackupID)
}
log.CtxLogger(ctx).Infow("Restoring file", "userID", config.GetUserId(), "fileName", fileName, "destName", destName, "prefix", prefix, "externalBackupID", externalBackupID)
objects, err := storage.ListObjects(ctx, bucketHandle, prefix, "", config.GetRetries())
if err != nil {
log.CtxLogger(ctx).Errorw("Error listing objects", "fileName", fileName, "prefix", prefix, "err", err, "externalBackupID", externalBackupID)
return []byte(fmt.Sprintf("#ERROR %s\n", fileName))
} else if len(objects) == 0 {
log.CtxLogger(ctx).Warnw("No objects found", "fileName", fileName, "prefix", prefix, "externalBackupID", externalBackupID)
return []byte(fmt.Sprintf("#NOTFOUND %s\n", fileName))
}
// The objects will be sorted by creation date, latest first.
// Thus, the requested restore will always be the first in the objects slice.
object := objects[0]
// From the specification, files are created by Backint and pipes are created by the database.
log.CtxLogger(ctx).Infow("Restoring object from bucket", "fileName", fileName, "prefix", prefix, "externalBackupID", externalBackupID, "obj", object.Name, "fileType", object.Metadata["X-Backup-Type"], "destName", destName, "objSize", object.Size)
var destFile *os.File
if object.Metadata["X-Backup-Type"] == "FILE" {
destFile, err = os.Create(parse.TrimAndClean(destName))
} else {
destFile, err = parse.OpenFileWithRetries(parse.TrimAndClean(destName), os.O_WRONLY|syscall.O_NONBLOCK, 0, config.GetFileReadTimeoutMs())
}
if err != nil {
log.CtxLogger(ctx).Errorw("Error opening dest file", "destName", destName, "err", err, "fileType", object.Metadata["X-Backup-Type"])
return []byte(fmt.Sprintf("#ERROR %s\n", fileName))
}
defer func() {
if err := destFile.Close(); err != nil {
log.CtxLogger(ctx).Errorw("Error closing dest file", "destName", destName, "err", err)
return
}
log.CtxLogger(ctx).Infow("Dest file closed successfully", "destName", destName)
}()
if fileInfo, err := destFile.Stat(); err != nil || fileInfo.Mode()&0222 == 0 {
log.CtxLogger(ctx).Errorw("Dest file does not have writeable permissions", "destName", destName, "fileInfo", fileInfo.Mode(), "err", err)
return []byte(fmt.Sprintf("#ERROR %s\n", fileName))
}
rw := storage.ReadWriter{
Writer: destFile,
Copier: copier,
BucketHandle: bucketHandle,
BucketName: config.GetBucket(),
ChunkSizeMb: config.GetBufferSizeMb(),
ObjectName: object.Name,
TotalBytes: object.Size,
LogDelay: time.Duration(config.GetLogDelaySec()) * time.Second,
RateLimitBytes: config.GetRateLimitMb() * 1024 * 1024,
EncryptionKey: config.GetEncryptionKey(),
KMSKey: config.GetKmsKey(),
MaxRetries: config.GetRetries(),
RetryBackoffInitial: time.Duration(config.GetRetryBackoffInitial()) * time.Second,
RetryBackoffMax: time.Duration(config.GetRetryBackoffMax()) * time.Second,
RetryBackoffMultiplier: float64(config.GetRetryBackoffMultiplier()),
ParallelDownloadWorkers: config.GetParallelRecoveryStreams(),
ParallelDownloadConnectParams: connectParams,
}
startTime := time.Now()
bytesWritten, err := rw.Download(ctx)
downloadTime := time.Since(startTime)
defer metrics.SendToCloudMonitoring(ctx, "restore", fileName, bytesWritten, downloadTime, config, err == nil, cloudProps, cloudmonitoring.NoBackOff(), metrics.DefaultMetricClient)
if err != nil {
log.CtxLogger(ctx).Errorw("Error downloading file", "bucket", config.GetBucket(), "destName", destName, "obj", object.Name, "err", err)
return []byte(fmt.Sprintf("#ERROR %s\n", fileName))
}
log.CtxLogger(ctx).Infow("File restored", "bucket", config.GetBucket(), "destName", destName, "obj", object.Name, "bytesWritten", bytesWritten, "downloadTimeSec", downloadTime.Round(time.Millisecond))
externalBackupID = strings.TrimSuffix(filepath.Base(object.Name), ".bak")
return []byte(fmt.Sprintf("#RESTORED %q %s\n", externalBackupID, fileName))
}