in internal/backint/restore/restore.go [60:134]
func restore(ctx context.Context, config *bpb.BackintConfiguration, connectParams *storage.ConnectParameters, input io.Reader, output io.Writer, cloudProps *ipb.CloudProperties) error {
startTime := time.Now()
wp := workerpool.New(int(config.GetThreads()))
mu := &sync.Mutex{}
scanner := bufio.NewScanner(input)
var lastFileName string
for scanner.Scan() {
line := scanner.Text()
log.CtxLogger(ctx).Infow("Executing restore input", "line", line)
if strings.HasPrefix(line, "#SOFTWAREID") {
if _, err := parse.WriteSoftwareVersion(line, output); err != nil {
return err
}
} else if strings.HasPrefix(line, "#NULL") {
s := parse.Split(line)
if len(s) < 2 {
return fmt.Errorf("malformed restore input line, got: %s, want: #NULL <file_name> [<dest_name>]", line)
}
fileName := s[1]
destName := fileName
lastFileName = fileName
// Destination is an optional parameter.
if len(s) > 2 {
destName = s[2]
}
restoreFunc := func() {
bucketHandle, _ := storage.ConnectToBucket(ctx, connectParams)
out := restoreFile(ctx, config, connectParams, bucketHandle, io.Copy, fileName, destName, "", cloudProps)
mu.Lock()
defer mu.Unlock()
output.Write(out)
}
// Log restores happen in serial, so do not submit to the workerpool.
if strings.Contains(fileName, "log_backup") {
restoreFunc()
} else {
wp.Submit(restoreFunc)
}
} else if strings.HasPrefix(line, "#EBID") {
s := parse.Split(line)
if len(s) < 3 {
return fmt.Errorf("malformed restore input line, got: %s, want: #EBID <external_backup_id> <file_name> [<dest_name>]", line)
}
externalBackupID := parse.TrimAndClean(s[1])
fileName := s[2]
destName := fileName
lastFileName = fileName
// Destination is an optional parameter.
if len(s) > 3 {
destName = s[3]
}
restoreFunc := func() {
bucketHandle, _ := storage.ConnectToBucket(ctx, connectParams)
out := restoreFile(ctx, config, connectParams, bucketHandle, io.Copy, fileName, destName, externalBackupID, cloudProps)
mu.Lock()
defer mu.Unlock()
output.Write(out)
}
// Log restores happen in serial, so do not submit to the workerpool.
if strings.Contains(fileName, "log_backup") {
restoreFunc()
} else {
wp.Submit(restoreFunc)
}
} else {
log.CtxLogger(ctx).Infow("Unknown prefix encountered, treated as a comment", "line", line)
}
}
wp.StopWait()
if err := scanner.Err(); err != nil {
return err
}
metrics.WriteFileTransferLog(ctx, "restore", parse.TrimAndClean(lastFileName), time.Since(startTime), config, cloudProps)
return nil
}