func restore()

in internal/backint/restore/restore.go [60:134]


func restore(ctx context.Context, config *bpb.BackintConfiguration, connectParams *storage.ConnectParameters, input io.Reader, output io.Writer, cloudProps *ipb.CloudProperties) error {
	startTime := time.Now()
	wp := workerpool.New(int(config.GetThreads()))
	mu := &sync.Mutex{}
	scanner := bufio.NewScanner(input)
	var lastFileName string
	for scanner.Scan() {
		line := scanner.Text()
		log.CtxLogger(ctx).Infow("Executing restore input", "line", line)
		if strings.HasPrefix(line, "#SOFTWAREID") {
			if _, err := parse.WriteSoftwareVersion(line, output); err != nil {
				return err
			}
		} else if strings.HasPrefix(line, "#NULL") {
			s := parse.Split(line)
			if len(s) < 2 {
				return fmt.Errorf("malformed restore input line, got: %s, want: #NULL <file_name> [<dest_name>]", line)
			}
			fileName := s[1]
			destName := fileName
			lastFileName = fileName
			// Destination is an optional parameter.
			if len(s) > 2 {
				destName = s[2]
			}
			restoreFunc := func() {
				bucketHandle, _ := storage.ConnectToBucket(ctx, connectParams)
				out := restoreFile(ctx, config, connectParams, bucketHandle, io.Copy, fileName, destName, "", cloudProps)
				mu.Lock()
				defer mu.Unlock()
				output.Write(out)
			}
			// Log restores happen in serial, so do not submit to the workerpool.
			if strings.Contains(fileName, "log_backup") {
				restoreFunc()
			} else {
				wp.Submit(restoreFunc)
			}
		} else if strings.HasPrefix(line, "#EBID") {
			s := parse.Split(line)
			if len(s) < 3 {
				return fmt.Errorf("malformed restore input line, got: %s, want: #EBID <external_backup_id> <file_name> [<dest_name>]", line)
			}
			externalBackupID := parse.TrimAndClean(s[1])
			fileName := s[2]
			destName := fileName
			lastFileName = fileName
			// Destination is an optional parameter.
			if len(s) > 3 {
				destName = s[3]
			}
			restoreFunc := func() {
				bucketHandle, _ := storage.ConnectToBucket(ctx, connectParams)
				out := restoreFile(ctx, config, connectParams, bucketHandle, io.Copy, fileName, destName, externalBackupID, cloudProps)
				mu.Lock()
				defer mu.Unlock()
				output.Write(out)
			}
			// Log restores happen in serial, so do not submit to the workerpool.
			if strings.Contains(fileName, "log_backup") {
				restoreFunc()
			} else {
				wp.Submit(restoreFunc)
			}
		} else {
			log.CtxLogger(ctx).Infow("Unknown prefix encountered, treated as a comment", "line", line)
		}
	}
	wp.StopWait()
	if err := scanner.Err(); err != nil {
		return err
	}
	metrics.WriteFileTransferLog(ctx, "restore", parse.TrimAndClean(lastFileName), time.Since(startTime), config, cloudProps)
	return nil
}