func StageLogs()

in oracle/pkg/agents/pitr/pitr.go [1090:1164]


func StageLogs(ctx context.Context, destDir string, include func(entry LogMetadataEntry) bool, logPath string) error {
	metadataStore, err := NewSimpleStore(ctx, logPath)
	if err != nil {
		return fmt.Errorf("failed to create a metadata store %v", err)
	}
	metadata := LogMetadata{}
	if err := metadataStore.Read(ctx, MetadataStorePath, &metadata); err != nil {
		return fmt.Errorf("failed to read metadata: %v", err)
	}

	n := len(metadata.KeyToLogEntry)
	if n == 0 {
		return fmt.Errorf("empty metadata: %v", metadata)
	}

	var toStage []LogMetadataEntry
	var notReplicated []LogMetadataEntry

	for _, v := range metadata.KeyToLogEntry {
		if include(v) {
			if v.ReplicaPath != "" {
				toStage = append(toStage, v)
			} else {
				notReplicated = append(notReplicated, v)
			}
		}
	}

	if len(notReplicated) > 0 {
		return fmt.Errorf("cannot find redo logs in replica location %+v", notReplicated)
	}

	if len(toStage) == 0 {
		klog.InfoS("no logs need to be staged")
		return nil
	}

	destClient := &fsClient{}
	var srcClient storageClient
	if strings.HasPrefix(toStage[0].ReplicaPath, gsPrefix) {
		c, err := newGcsClient(ctx)
		if err != nil {
			return err
		}
		srcClient = c
	} else {
		srcClient = &fsClient{}
	}
	defer func() {
		srcClient.close(ctx)
		destClient.close(ctx)
	}()
	if err := srcClient.mkdirp(ctx, destDir, 0750); err != nil {
		return fmt.Errorf("failed to create the stage dir: %v", err)
	}
	toReplicate := make(chan srcDest)
	group := newReplicationGroup(toReplicate, srcClient, destClient)
	group.runCopy(ctx, replicationThreadCount)
	for _, ts := range toStage {
		toReplicate <- srcDest{
			src:  ts.ReplicaPath,
			dest: filepath.Join(destDir, filepath.Base(ts.SrcPath)),
		}
	}
	// stop group goroutines
	close(toReplicate)
	group.wait()

	if group.errCount > 0 {
		return fmt.Errorf("stage completed with errors error count: %d", group.errCount)
	}
	klog.Info("stage successfully completed")

	return nil
}