in oracle/pkg/agents/pitr/pitr.go [1090:1164]
func StageLogs(ctx context.Context, destDir string, include func(entry LogMetadataEntry) bool, logPath string) error {
metadataStore, err := NewSimpleStore(ctx, logPath)
if err != nil {
return fmt.Errorf("failed to create a metadata store %v", err)
}
metadata := LogMetadata{}
if err := metadataStore.Read(ctx, MetadataStorePath, &metadata); err != nil {
return fmt.Errorf("failed to read metadata: %v", err)
}
n := len(metadata.KeyToLogEntry)
if n == 0 {
return fmt.Errorf("empty metadata: %v", metadata)
}
var toStage []LogMetadataEntry
var notReplicated []LogMetadataEntry
for _, v := range metadata.KeyToLogEntry {
if include(v) {
if v.ReplicaPath != "" {
toStage = append(toStage, v)
} else {
notReplicated = append(notReplicated, v)
}
}
}
if len(notReplicated) > 0 {
return fmt.Errorf("cannot find redo logs in replica location %+v", notReplicated)
}
if len(toStage) == 0 {
klog.InfoS("no logs need to be staged")
return nil
}
destClient := &fsClient{}
var srcClient storageClient
if strings.HasPrefix(toStage[0].ReplicaPath, gsPrefix) {
c, err := newGcsClient(ctx)
if err != nil {
return err
}
srcClient = c
} else {
srcClient = &fsClient{}
}
defer func() {
srcClient.close(ctx)
destClient.close(ctx)
}()
if err := srcClient.mkdirp(ctx, destDir, 0750); err != nil {
return fmt.Errorf("failed to create the stage dir: %v", err)
}
toReplicate := make(chan srcDest)
group := newReplicationGroup(toReplicate, srcClient, destClient)
group.runCopy(ctx, replicationThreadCount)
for _, ts := range toStage {
toReplicate <- srcDest{
src: ts.ReplicaPath,
dest: filepath.Join(destDir, filepath.Base(ts.SrcPath)),
}
}
// stop group goroutines
close(toReplicate)
group.wait()
if group.errCount > 0 {
return fmt.Errorf("stage completed with errors error count: %d", group.errCount)
}
klog.Info("stage successfully completed")
return nil
}