func restoreFile()

in internal/backint/restore/restore.go [139:212]


func restoreFile(ctx context.Context, config *bpb.BackintConfiguration, connectParams *storage.ConnectParameters, bucketHandle *store.BucketHandle, copier storage.IOFileCopier, fileName, destName, externalBackupID string, cloudProps *ipb.CloudProperties) []byte {
	prefix := parse.CreateObjectPath(config, parse.TrimAndClean(fileName), "", "")
	if externalBackupID != "" {
		prefix += fmt.Sprintf("%s.bak", externalBackupID)
	}

	log.CtxLogger(ctx).Infow("Restoring file", "userID", config.GetUserId(), "fileName", fileName, "destName", destName, "prefix", prefix, "externalBackupID", externalBackupID)
	objects, err := storage.ListObjects(ctx, bucketHandle, prefix, "", config.GetRetries())
	if err != nil {
		log.CtxLogger(ctx).Errorw("Error listing objects", "fileName", fileName, "prefix", prefix, "err", err, "externalBackupID", externalBackupID)
		return []byte(fmt.Sprintf("#ERROR %s\n", fileName))
	} else if len(objects) == 0 {
		log.CtxLogger(ctx).Warnw("No objects found", "fileName", fileName, "prefix", prefix, "externalBackupID", externalBackupID)
		return []byte(fmt.Sprintf("#NOTFOUND %s\n", fileName))
	}
	// The objects will be sorted by creation date, latest first.
	// Thus, the requested restore will always be the first in the objects slice.
	object := objects[0]

	// From the specification, files are created by Backint and pipes are created by the database.
	log.CtxLogger(ctx).Infow("Restoring object from bucket", "fileName", fileName, "prefix", prefix, "externalBackupID", externalBackupID, "obj", object.Name, "fileType", object.Metadata["X-Backup-Type"], "destName", destName, "objSize", object.Size)
	var destFile *os.File
	if object.Metadata["X-Backup-Type"] == "FILE" {
		destFile, err = os.Create(parse.TrimAndClean(destName))
	} else {
		destFile, err = parse.OpenFileWithRetries(parse.TrimAndClean(destName), os.O_WRONLY|syscall.O_NONBLOCK, 0, config.GetFileReadTimeoutMs())
	}
	if err != nil {
		log.CtxLogger(ctx).Errorw("Error opening dest file", "destName", destName, "err", err, "fileType", object.Metadata["X-Backup-Type"])
		return []byte(fmt.Sprintf("#ERROR %s\n", fileName))
	}
	defer func() {
		if err := destFile.Close(); err != nil {
			log.CtxLogger(ctx).Errorw("Error closing dest file", "destName", destName, "err", err)
			return
		}
		log.CtxLogger(ctx).Infow("Dest file closed successfully", "destName", destName)
	}()
	if fileInfo, err := destFile.Stat(); err != nil || fileInfo.Mode()&0222 == 0 {
		log.CtxLogger(ctx).Errorw("Dest file does not have writeable permissions", "destName", destName, "fileInfo", fileInfo.Mode(), "err", err)
		return []byte(fmt.Sprintf("#ERROR %s\n", fileName))
	}

	rw := storage.ReadWriter{
		Writer:                        destFile,
		Copier:                        copier,
		BucketHandle:                  bucketHandle,
		BucketName:                    config.GetBucket(),
		ChunkSizeMb:                   config.GetBufferSizeMb(),
		ObjectName:                    object.Name,
		TotalBytes:                    object.Size,
		LogDelay:                      time.Duration(config.GetLogDelaySec()) * time.Second,
		RateLimitBytes:                config.GetRateLimitMb() * 1024 * 1024,
		EncryptionKey:                 config.GetEncryptionKey(),
		KMSKey:                        config.GetKmsKey(),
		MaxRetries:                    config.GetRetries(),
		RetryBackoffInitial:           time.Duration(config.GetRetryBackoffInitial()) * time.Second,
		RetryBackoffMax:               time.Duration(config.GetRetryBackoffMax()) * time.Second,
		RetryBackoffMultiplier:        float64(config.GetRetryBackoffMultiplier()),
		ParallelDownloadWorkers:       config.GetParallelRecoveryStreams(),
		ParallelDownloadConnectParams: connectParams,
	}
	startTime := time.Now()
	bytesWritten, err := rw.Download(ctx)
	downloadTime := time.Since(startTime)
	defer metrics.SendToCloudMonitoring(ctx, "restore", fileName, bytesWritten, downloadTime, config, err == nil, cloudProps, cloudmonitoring.NoBackOff(), metrics.DefaultMetricClient)
	if err != nil {
		log.CtxLogger(ctx).Errorw("Error downloading file", "bucket", config.GetBucket(), "destName", destName, "obj", object.Name, "err", err)
		return []byte(fmt.Sprintf("#ERROR %s\n", fileName))
	}
	log.CtxLogger(ctx).Infow("File restored", "bucket", config.GetBucket(), "destName", destName, "obj", object.Name, "bytesWritten", bytesWritten, "downloadTimeSec", downloadTime.Round(time.Millisecond))
	externalBackupID = strings.TrimSuffix(filepath.Base(object.Name), ".bak")
	return []byte(fmt.Sprintf("#RESTORED %q %s\n", externalBackupID, fileName))
}