in internal/backint/backup/backup.go [166:237]
func backupFile(ctx context.Context, p parameters) string {
fileNameTrim := parse.TrimAndClean(p.fileName)
object := parse.CreateObjectPath(p.config, fileNameTrim, p.externalBackupID, p.extension)
metadata := p.config.GetMetadata()
if metadata == nil {
metadata = make(map[string]string)
}
metadata["X-Backup-Type"] = strings.ReplaceAll(p.fileType, "#", "")
log.CtxLogger(ctx).Infow("Backing up file", "fileType", p.fileType, "fileName", p.fileName, "obj", object, "fileSize", p.fileSize, "fileType", p.fileType, "storageClass", p.config.GetStorageClass().String(), "metadata", metadata)
if p.reader == nil {
f, err := parse.OpenFileWithRetries(fileNameTrim, os.O_RDONLY, 0, p.config.GetFileReadTimeoutMs())
if err != nil {
log.CtxLogger(ctx).Errorw("Error opening backup file", "fileName", p.fileName, "obj", object, "fileType", p.fileType, "err", err)
return fmt.Sprintf("#ERROR %s\n", p.fileName)
}
defer f.Close()
fileInfo, err := f.Stat()
if err != nil || fileInfo.Mode()&0444 == 0 {
log.CtxLogger(ctx).Errorw("Backup file does not have readable permissions", "fileName", p.fileName, "obj", object, "fileType", p.fileType, "err", err)
return fmt.Sprintf("#ERROR %s\n", p.fileName)
}
if p.fileSize == 0 && p.fileType == "#FILE" {
p.fileSize = fileInfo.Size()
}
p.reader = f
}
storageClass := p.config.GetStorageClass().String()
if storageClass == "STORAGE_CLASS_UNSPECIFIED" {
// An empty string allows the bucket default to be used.
storageClass = ""
}
rw := storage.ReadWriter{
Reader: p.reader,
Copier: p.copier,
BucketHandle: p.bucketHandle,
BucketName: p.config.GetBucket(),
ChunkSizeMb: p.config.GetBufferSizeMb(),
ObjectName: object,
TotalBytes: p.fileSize,
LogDelay: time.Duration(p.config.GetLogDelaySec()) * time.Second,
Compress: p.config.GetCompress(),
StorageClass: storageClass,
DumpData: p.config.GetDumpData(),
RateLimitBytes: p.config.GetRateLimitMb() * 1024 * 1024,
EncryptionKey: p.config.GetEncryptionKey(),
KMSKey: p.config.GetKmsKey(),
MaxRetries: p.config.GetRetries(),
VerifyUpload: true,
Metadata: metadata,
RetryBackoffInitial: time.Duration(p.config.GetRetryBackoffInitial()) * time.Second,
RetryBackoffMax: time.Duration(p.config.GetRetryBackoffMax()) * time.Second,
RetryBackoffMultiplier: float64(p.config.GetRetryBackoffMultiplier()),
XMLMultipartUpload: p.config.GetXmlMultipartUpload() && p.config.GetParallelStreams() > 1,
XMLMultipartWorkers: p.config.GetParallelStreams(),
XMLMultipartServiceAccount: p.config.GetServiceAccountKey(),
XMLMultipartEndpoint: p.config.GetClientEndpoint(),
CustomTime: parse.CustomTime(ctx, "custom_time", p.config.GetCustomTime(), time.Now().UTC()),
ObjectRetentionMode: p.config.GetObjectRetentionMode(),
ObjectRetentionTime: parse.CustomTime(ctx, "object_retention_time", p.config.GetObjectRetentionTime(), time.Now().UTC()),
}
startTime := time.Now()
bytesWritten, err := rw.Upload(ctx)
uploadTime := time.Since(startTime)
defer metrics.SendToCloudMonitoring(ctx, "backup", p.fileName, bytesWritten, uploadTime, p.config, err == nil, p.cloudProps, cloudmonitoring.NoBackOff(), metrics.DefaultMetricClient)
if err != nil {
log.CtxLogger(ctx).Errorw("Error uploading file", "bucket", p.config.GetBucket(), "fileName", p.fileName, "obj", object, "fileType", p.fileType, "err", err)
return fmt.Sprintf("#ERROR %s\n", p.fileName)
}
log.CtxLogger(ctx).Infow("File uploaded", "bucket", p.config.GetBucket(), "fileName", p.fileName, "obj", object, "bytesWritten", bytesWritten, "fileSize", p.fileSize, "fileType", p.fileType, "uploadTimeSec", uploadTime.Round(time.Millisecond))
return fmt.Sprintf("#SAVED %q %s %s\n", p.externalBackupID, p.fileName, strconv.FormatInt(bytesWritten, 10))
}