oracle/pkg/agents/pitr/pitr.go (929 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pitr import ( "context" "encoding/base64" "encoding/binary" "encoding/gob" "encoding/json" "errors" "fmt" "hash/crc32" "io" "io/fs" "os" "path/filepath" "reflect" "sort" "strings" "sync" "sync/atomic" "time" "cloud.google.com/go/storage" dbdpb "github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/agents/oracle" "k8s.io/klog/v2" ) var ( syncInterval = time.Minute syncTimeout = time.Hour cleanupInterval = time.Hour * 24 castagnoli = crc32.MakeTable(crc32.Castagnoli) replicationThreadCount = 4 timeNow = time.Now ) const ( gsPrefix = "gs://" // MetadataStorePath is the path to log metadata catalog. MetadataStorePath = "catalog" archiveLagParam = "archive_lag_target" archiveLagVal = 600 // 10 min ) type storageClient interface { // hash returns CRC32C hash of a file. hash(ctx context.Context, path string) (h []byte, retErr error) // mtime returns the time that the data was last modified. mtime(ctx context.Context, path string) (time.Time, error) // mkdirp creates a directory named path, along with any necessary parents. mkdirp(ctx context.Context, path string, mode os.FileMode) error // read reads the named path and returns the reader. read(ctx context.Context, path string) (io.ReadCloser, error) // write writes the named path and returns the writer. write(ctx context.Context, path string) (io.WriteCloser, error) // delete deletes the named path. delete(ctx context.Context, path string, ignoreNotExists bool) error // close closes the client close(ctx context.Context) error } type srcDest struct { src string dest string } type gcsClient struct { c *storage.Client } func (g *gcsClient) hash(ctx context.Context, path string) (h []byte, retErr error) { bucket, name, err := g.splitURI(path) if err != nil { return nil, err } c, err := storage.NewClient(ctx) if err != nil { return nil, err } defer func() { if err := c.Close(); err != nil { if retErr == nil { retErr = err } else { retErr = fmt.Errorf("%v; %v", retErr, err) } } }() b := c.Bucket(bucket) // check if bucket exists and it is accessible a, err := b.Object(name).Attrs(ctx) if err != nil { return nil, err } hashBytes := make([]byte, 4) binary.BigEndian.PutUint32(hashBytes, a.CRC32C) return hashBytes, err } func (g *gcsClient) mtime(ctx context.Context, path string) (t time.Time, retErr error) { bucket, name, err := g.splitURI(path) if err != nil { return time.Time{}, err } b := g.c.Bucket(bucket) // check if bucket exists and it is accessible if _, err := b.Attrs(ctx); err != nil { return time.Time{}, err } r, err := b.Object(name).NewReader(ctx) if err != nil { return time.Time{}, err } defer func() { if err := r.Close(); err != nil { if retErr == nil { retErr = err } else { retErr = fmt.Errorf("%v; %v", retErr, err) } } }() return r.Attrs.LastModified, nil } func (g *gcsClient) mkdirp(context.Context, string, os.FileMode) error { // https://cloud.google.com/storage/docs/folders return nil } func (g *gcsClient) read(ctx context.Context, path string) (closer io.ReadCloser, retErr error) { bucket, name, err := g.splitURI(path) if err != nil { return nil, err } b := g.c.Bucket(bucket) // check if bucket exists and it is accessible if _, err := b.Attrs(ctx); err != nil { return nil, err } r, err := b.Object(name).NewReader(ctx) if err != nil { return nil, err } return r, nil } func (g *gcsClient) write(ctx context.Context, path string) (closer io.WriteCloser, retErr error) { bucket, name, err := g.splitURI(path) if err != nil { return nil, err } b := g.c.Bucket(bucket) // check if bucket exists and it is accessible if _, err := b.Attrs(ctx); err != nil { return nil, err } return b.Object(name).NewWriter(ctx), nil } func (g *gcsClient) delete(ctx context.Context, path string, ignoreNotExists bool) error { bucket, name, err := g.splitURI(path) if err != nil { return err } o := g.c.Bucket(bucket).Object(name) if err := o.Delete(ctx); err != nil && !(ignoreNotExists && err == storage.ErrObjectNotExist) { return fmt.Errorf("Bucket(%q).Object(%q).Delete: %w", bucket, name, err) } return nil } func (g *gcsClient) close(context.Context) error { return g.c.Close() } func (g *gcsClient) splitURI(url string) (bucket, name string, err error) { u := strings.TrimPrefix(url, gsPrefix) if u == url { return "", "", fmt.Errorf("URL %q is missing the %q prefix", url, gsPrefix) } if i := strings.Index(u, "/"); i >= 2 { return u[:i], u[i+1:], nil } return "", "", fmt.Errorf("URL %q does not specify a bucket and a name", url) } func newGcsClient(ctx context.Context) (*gcsClient, error) { c, err := storage.NewClient(ctx) if err != nil { return nil, err } return &gcsClient{c: c}, nil } type fsClient struct{} func (f *fsClient) read(_ context.Context, path string) (io.ReadCloser, error) { return os.Open(path) } func (f *fsClient) write(_ context.Context, path string) (io.WriteCloser, error) { dir := filepath.Dir(path) if _, err := os.Stat(dir); os.IsNotExist(err) { if err := os.MkdirAll(dir, 0750); err != nil { return nil, err } } return os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0640) } func (f *fsClient) hash(_ context.Context, path string) (h []byte, retErr error) { file, err := os.Open(path) if err != nil { return nil, err } defer func() { if err := file.Close(); err != nil { if retErr == nil { retErr = err } else { retErr = fmt.Errorf("%v; %v", retErr, err) } } }() hash := crc32.New(castagnoli) _, err = io.Copy(hash, file) if err != nil { return nil, err } return hash.Sum(nil), nil } func (f *fsClient) mkdirp(_ context.Context, path string, mode os.FileMode) error { return os.MkdirAll(path, mode) } func (f *fsClient) mtime(_ context.Context, path string) (time.Time, error) { info, err := os.Stat(path) if err != nil { return time.Time{}, err } // convert to string to avoid complexity in serialization t := info.ModTime() return t, nil } func (f *fsClient) delete(_ context.Context, path string, ignoreNotExists bool) error { if err := os.Remove(path); err != nil && !(ignoreNotExists && os.IsNotExist(err)) { return err } return nil } func (f *fsClient) close(context.Context) error { return nil } type replicationGroup struct { wg *sync.WaitGroup errCount uint64 toReplicate <-chan srcDest srcClient storageClient destClient storageClient } func newReplicationGroup(toReplicate <-chan srcDest, srcClient storageClient, destClient storageClient) *replicationGroup { return &replicationGroup{ wg: &sync.WaitGroup{}, errCount: 0, toReplicate: toReplicate, srcClient: srcClient, destClient: destClient, } } func (g *replicationGroup) runSync(ctx context.Context, threadCount int, hashStore *SimpleStore) { for i := 0; i < threadCount; i++ { g.wg.Add(1) go func() { defer g.wg.Done() for { select { case <-ctx.Done(): klog.Info("exiting go routine as context canceled/timeout...") return case sd, ok := <-g.toReplicate: if !ok { klog.Info("exiting sync go routine as replicate channel closed") return } g.sync(ctx, sd, hashStore) } } }() } } func (g *replicationGroup) copy(ctx context.Context, sd srcDest) (sizeBytes int64, retErr error) { sr, err := g.srcClient.read(ctx, sd.src) if err != nil { return 0, err } defer func() { if err := sr.Close(); err != nil { if retErr == nil { retErr = err } else { retErr = fmt.Errorf("%v; %v", retErr, err) } } }() dw, err := g.destClient.write(ctx, sd.dest) if err != nil { return 0, err } defer func() { if err := dw.Close(); err != nil { if retErr == nil { retErr = err } else { retErr = fmt.Errorf("%v; %v", retErr, err) } } }() // TODO TeeReader can copy and calculate hash. size, err := io.Copy(dw, sr) if err != nil { return 0, err } return size, nil } func (g *replicationGroup) sync(ctx context.Context, sd srcDest, hashStore *SimpleStore) { klog.InfoS("syncing", "src", sd.src, "dest", sd.dest) changed, err := g.changed(ctx, sd.src, hashStore) // err in change detection will not be added to errCount if err == nil && !changed { klog.InfoS("skip sync as the src was replicated and mtime unchanged", "src", sd.src) return } // if change detection failed or change detected, continue copy start := time.Now() sizeBytes, err := g.copy(ctx, sd) if err != nil { atomic.AddUint64(&g.errCount, 1) klog.ErrorS(err, "failed to copy a file", "src", sd.src, "dest", sd.dest) return } end := time.Now() rate := float64(sizeBytes) / (end.Sub(start).Seconds()) klog.InfoS("copy", "src", sd.src, "dest", sd.dest, "throughput", fmt.Sprintf("%f MB/s", rate/1024/1024)) hash, err := g.validateHash(ctx, sd.src, sd.dest) if err != nil { atomic.AddUint64(&g.errCount, 1) klog.ErrorS(err, "failed to validate the hash of a file", "src", sd.src, "dest", sd.dest) return } t, err := g.srcClient.mtime(ctx, sd.src) if err != nil { atomic.AddUint64(&g.errCount, 1) klog.ErrorS(err, "failed to read mtime from a file", "src", sd.src) } hashStore.Lock() defer hashStore.UnLock() if err := hashStore.write(ctx, sd.src, LogHashEntry{ Crc32cHash: hash, ReplicaPath: sd.dest, ModTime: t, }); err != nil { atomic.AddUint64(&g.errCount, 1) klog.ErrorS(err, "failed to store hash in metadata", "src", sd.src, "dest", sd.dest, "hash", hash) } klog.InfoS("syncing done", "src", sd.src, "dest", sd.dest) } func (g *replicationGroup) validateHash(ctx context.Context, src, dest string) (string, error) { srcHash, err := g.srcClient.hash(ctx, src) if err != nil { return "", err } destHash, err := g.destClient.hash(ctx, dest) if err != nil { return "", err } srcEncoded := base64.StdEncoding.EncodeToString(srcHash) destEncoded := base64.StdEncoding.EncodeToString(destHash) if srcEncoded != destEncoded { return "", fmt.Errorf("hash mismatched src %q=%s, dest %q=%s", src, srcHash, dest, destHash) } return destEncoded, nil } func (g *replicationGroup) changed(ctx context.Context, src string, hashStore *SimpleStore) (bool, error) { storedHash := LogHashEntry{} hashStore.Lock() err := hashStore.Read(ctx, src, &storedHash) hashStore.UnLock() // either file corrupted or not replicated if err != nil || storedHash.ReplicaPath == "" { return true, err } klog.InfoS("found existing hash", "src", src, "storedHash", storedHash) currentModTime, err := g.srcClient.mtime(ctx, src) if err != nil { return true, err } klog.InfoS("mod time", "src", src, "stored mtime", storedHash.ModTime, "current mtime", currentModTime) return !currentModTime.Equal(storedHash.ModTime), nil } func (g *replicationGroup) runCopy(ctx context.Context, threadCount int) { for i := 0; i < threadCount; i++ { g.wg.Add(1) go func() { defer g.wg.Done() for { select { case <-ctx.Done(): klog.Info("exiting go routine as context canceled/timeout...") return case sd, ok := <-g.toReplicate: if !ok { klog.Info("exiting sync go routine as replicate channel closed") return } start := time.Now() sizeBytes, err := g.copy(ctx, sd) if err != nil { atomic.AddUint64(&g.errCount, 1) klog.ErrorS(err, "failed to copy a file", "src", sd.src, "dest", sd.dest) continue } end := time.Now() rate := float64(sizeBytes) / (end.Sub(start).Seconds()) klog.InfoS("copy", "src", sd.src, "dest", sd.dest, "throughput", fmt.Sprintf("%f MB/s", rate/1024/1024)) } } }() } } func (g *replicationGroup) wait() { g.wg.Wait() } func runReplication(ctx context.Context, srcDir, destDir string, localClient *fsClient, remoteClient storageClient, hashStore *SimpleStore) error { start := time.Now() defer func() { klog.InfoS("runReplication", "used time", time.Now().Sub(start)) }() ctx, cancel := context.WithDeadline(ctx, time.Now().Add(syncTimeout)) defer cancel() toReplicate := make(chan srcDest) group := newReplicationGroup(toReplicate, localClient, remoteClient) group.runSync(ctx, replicationThreadCount, hashStore) err := filepath.WalkDir(srcDir, func(path string, d fs.DirEntry, err error) error { if err != nil { klog.Error(err, "failed to access a path %q: %v", path, err) return err } info, err := d.Info() if err != nil { klog.Error(err, "failed to read a file info", "dest", d) return err } sub, err := filepath.Rel(srcDir, path) if err != nil { klog.Error(err, "failed to find a rel path", "srcDir", srcDir, "path", path) return err } dest := destDir + sub if !strings.HasSuffix(destDir, "/") { dest = destDir + "/" + sub } if d.IsDir() { if err := remoteClient.mkdirp(ctx, dest, info.Mode()); err != nil { klog.Error(err, "failed to create a dir", "dest", dest) return err } return nil } toReplicate <- srcDest{ src: path, dest: dest, } return nil }) // stop group goroutines close(toReplicate) group.wait() if group.errCount > 0 || err != nil { return fmt.Errorf("replication completed with errors sync error count: %d, walk dir error: %v", group.errCount, err) } klog.Info("replication successfully completed") return nil } // LogMetadata stores metadata information for redo logs. type LogMetadata struct { // KeyToLogEntry stores redo logs information in a map. // key is used for deduplicate KeyToLogEntry map[string]LogMetadataEntry } // LogMetadataEntry stores metadata information for a redo log. type LogMetadataEntry struct { // LogHashEntry stores hash information for a redo log. LogHashEntry // SrcPath stores the path of a redo log in original environment. SrcPath string // FirstChange stores the first SCN(inclusive) of a redo log. FirstChange string // SCN inclusive // NextChange stores the next SCN(exclusive) of a redo log. NextChange string // FirstChange stores the first timestamp (inclusive) of a redo log. // we used TO_CHAR(DATE, 'YYYY-MM-DD"T"HH24:MI:SS"Z"') FirstTime time.Time // NextTime stores the next timestamp (inclusive) of a redo log. // we used TO_CHAR(DATE, 'YYYY-MM-DD"T"HH24:MI:SS"Z"') NextTime time.Time // CompletionTime stores the timestamp when a redo log was archived. CompletionTime string // Sequence stores the sequence of a archived redo log. Sequence string // Incarnation stores the Incarnation of a archived redo log. Incarnation string // Thread stores the redo thread number Thread string } // LogHashEntry stores hash information for a redo log. type LogHashEntry struct { // Crc32cHash stores the crc32c hash of a redo log. Crc32cHash string // ReplicaPath stores the path of a replicated redo log. ReplicaPath string // ModTime stores the mod time of a redo log after replication. ModTime time.Time } // SimpleStore implements a simple data store to read and write golang objects. type SimpleStore struct { mu sync.Mutex sync.RWMutex c storageClient dataDir string } // NewSimpleStore returns a SimpleStore, SimpleStore will store data under the specified dataDir. // '/' will be appended to the dataDir if it does not end with '/' func NewSimpleStore(ctx context.Context, dataDir string) (*SimpleStore, error) { var c storageClient if strings.HasPrefix(dataDir, gsPrefix) { gc, err := newGcsClient(ctx) if err != nil { return nil, err } c = gc } else { c = &fsClient{} } if !strings.HasSuffix(dataDir, "/") { dataDir = dataDir + "/" } return &SimpleStore{ mu: sync.Mutex{}, c: c, dataDir: dataDir, }, nil } // Close closes the storage client of the store. func (s *SimpleStore) Close(ctx context.Context) error { return s.c.close(ctx) } // Read retrieves a golang object from this SimpleStore and decode it into data. // This method is unsafe for concurrent use. It's caller's responsibility to call // SimpleStore.Lock()/SimpleStore.UnLock() for synchronization between goroutines. func (s *SimpleStore) Read(ctx context.Context, path string, data interface{}) (retErr error) { if data == nil { return fmt.Errorf("invalid input %v", data) } value := reflect.ValueOf(data) if value.Type().Kind() != reflect.Ptr { return errors.New("attempt to store the retrieved data into a non-pointer") } dataPath := s.dataDir + path r, err := s.c.read(ctx, dataPath) if err != nil { return err } defer func() { if err := r.Close(); err != nil { if retErr == nil { retErr = err } else { retErr = fmt.Errorf("%v; %v", retErr, err) } } }() dec := gob.NewDecoder(r) return dec.Decode(data) } // write writes a golang object into this SimpleStore. // This method is unsafe for concurrent use. It's caller's responsibility to call // SimpleStore.Lock()/SimpleStore.UnLock() for synchronization between goroutines. func (s *SimpleStore) write(ctx context.Context, path string, data interface{}) (retErr error) { w, err := s.c.write(ctx, s.dataDir+path) if err != nil { return err } defer func() { if err := w.Close(); err != nil { if retErr == nil { retErr = err } else { retErr = fmt.Errorf("%v; %v", retErr, err) } } }() enc := gob.NewEncoder(w) return enc.Encode(data) } // Lock locks this store. func (s *SimpleStore) Lock() { s.mu.Lock() } // UnLock unlocks this store. func (s *SimpleStore) UnLock() { s.mu.Unlock() } // delete deletes the named path and its associated golang object. func (s *SimpleStore) delete(ctx context.Context, path string) (retErr error) { return s.c.delete(ctx, path, true) } type logSyncer struct { dest string dbdClient dbdpb.DatabaseDaemonClient localClient *fsClient remoteClient storageClient hashStore *SimpleStore } func (l *logSyncer) run(ctx context.Context) error { src, err := getArchivedLogDir(ctx, l.dbdClient) if err != nil { // cannot get log dir to start sync return err } err = runReplication(ctx, src, l.dest, l.localClient, l.remoteClient, l.hashStore) if err != nil { klog.ErrorS(err, "initial sync failed") } ticker := time.NewTicker(syncInterval) defer ticker.Stop() for { select { case <-ctx.Done(): klog.Info("Exiting syncer go routine as context canceled/timeout...") return nil case <-ticker.C: err = runReplication(ctx, src, l.dest, l.localClient, l.remoteClient, l.hashStore) if err != nil { klog.ErrorS(err, "sync failed") } else { // check whether log location changed or not if newSrc, err := getArchivedLogDir(ctx, l.dbdClient); err != nil { src = newSrc } } } } } func getArchivedLogDir(ctx context.Context, dbdClient dbdpb.DatabaseDaemonClient) (string, error) { resp, err := dbdClient.RunSQLPlusFormatted(ctx, &dbdpb.RunSQLPlusCMDRequest{Commands: []string{"select name from v$recovery_file_dest"}, Suppress: false}) if err != nil { return "", err } if len(resp.GetMsg()) <= 0 { return "", fmt.Errorf("getArchivedLogDir: failed to find the recovery_file_dest from %v", resp) } row := make(map[string]string) if err := json.Unmarshal([]byte(resp.GetMsg()[0]), &row); err != nil { return "", err } return row["NAME"], nil } // RunLogReplication starts redo logs replication. // It runs below steps repeatedly // Read archived redo logs location with dbdClient at very beginning or after a success sync., // sync redo logs to dest specified location. func RunLogReplication(ctx context.Context, dbdClient dbdpb.DatabaseDaemonClient, dest string, hashStore *SimpleStore) error { local := &fsClient{} var remote storageClient if strings.HasPrefix(dest, gsPrefix) { c, err := newGcsClient(ctx) if err != nil { return err } remote = c } else { remote = local } defer func() { local.close(ctx) remote.close(ctx) }() syncer := &logSyncer{ dest: dest, dbdClient: dbdClient, localClient: local, remoteClient: remote, hashStore: hashStore, } return syncer.run(ctx) } // SetArchiveLag sets archive lag target parameter in the database if the value is 0. func SetArchiveLag(ctx context.Context, dbdClient dbdpb.DatabaseDaemonClient) error { resp, err := dbdClient.RunSQLPlusFormatted( ctx, &dbdpb.RunSQLPlusCMDRequest{Commands: []string{fmt.Sprintf("select value from v$parameter where name='%s'", archiveLagParam)}, Suppress: false}, ) if err != nil { return fmt.Errorf("SetArchiveLag: failed to get archive lag value : %v", err) } lag := "0" if len(resp.GetMsg()) > 0 { row := make(map[string]string) if err := json.Unmarshal([]byte(resp.GetMsg()[0]), &row); err != nil { return fmt.Errorf("SetArchiveLag: failed to parse %v", resp) } lag = row["VALUE"] } if lag != "0" { klog.Info("SetArchiveLag: found archive lag parameter set, skip update. ", "value=", lag) return nil } cmd := fmt.Sprintf("alter system set %s=%d scope=both", archiveLagParam, archiveLagVal) _, err = dbdClient.RunSQLPlus(ctx, &dbdpb.RunSQLPlusCMDRequest{ Commands: []string{cmd}, Suppress: false, }) if err != nil { return fmt.Errorf("SetArchiveLag: failed to exectue parameter command: %q", cmd) } return nil } // RunMetadataUpdate starts metadata update. // It runs below steps repeatedly // Read archived redo logs view with dbdClient, cumulatively update log metadata into metaStore. func RunMetadataUpdate(ctx context.Context, dbdClient dbdpb.DatabaseDaemonClient, hashStore *SimpleStore, metadataStore *SimpleStore) error { if err := metadataUpdate(ctx, dbdClient, hashStore, metadataStore); err != nil { klog.ErrorS(err, "failed to update log metadata") } ticker := time.NewTicker(syncInterval) defer ticker.Stop() for { select { case <-ctx.Done(): klog.Info("Exiting log metadata update go routine as context canceled/timeout...") return nil case <-ticker.C: if err := metadataUpdate(ctx, dbdClient, hashStore, metadataStore); err != nil { klog.ErrorS(err, "failed to update log metadata") } } } } func metadataUpdate(ctx context.Context, dbdClient dbdpb.DatabaseDaemonClient, hashStore *SimpleStore, metadataStore *SimpleStore) error { archiveDir, err := getArchivedLogDir(ctx, dbdClient) if err != nil { return err } // TODO based on retention/ keep track the last success status update timestamp, // we can select log COMPLETION_TIME >= NOW - RETENTION to reduce the size of result. // Assume El Carro instance date is in UTC query := "select " + "v$archived_log.NAME, " + "v$archived_log.FIRST_CHANGE#, " + "TO_CHAR(v$archived_log.FIRST_TIME, 'YYYY-MM-DD\"T\"HH24:MI:SS\"Z\"') as FIRST_TIME, " + "v$archived_log.NEXT_CHANGE#, " + "TO_CHAR(v$archived_log.NEXT_TIME, 'YYYY-MM-DD\"T\"HH24:MI:SS\"Z\"') as NEXT_TIME, " + "v$archived_log.SEQUENCE#, " + "TO_CHAR(v$archived_log.COMPLETION_TIME, 'YYYY-MM-DD\"T\"HH24:MI:SS\"Z00:00\"') as COMPLETION_TIME, " + "v$database_incarnation.INCARNATION#, " + "v$archived_log.THREAD# " + "from v$archived_log left join v$database_incarnation on v$archived_log.RESETLOGS_ID=v$database_incarnation.RESETLOGS_ID " + "where v$archived_log.COMPLETION_TIME >= (SYSDATE - 30) AND v$archived_log.NAME LIKE '" + archiveDir + "%'" resp, err := dbdClient.RunSQLPlusFormatted(ctx, &dbdpb.RunSQLPlusCMDRequest{ Commands: []string{query}, Suppress: true, }, ) if err != nil { return err } // read and update metadata catalog metadataStore.Lock() defer metadataStore.UnLock() metadata := &LogMetadata{} // TODO better retry or implement retry in store. for i := 0; i <= 5; i++ { if err := metadataStore.Read(ctx, MetadataStorePath, metadata); err != nil { klog.ErrorS(err, "failed to load metadata", "attempt", i) } else { break } } keys := []string{"INCARNATION#", "SEQUENCE#", "NAME", "FIRST_TIME", "NEXT_TIME", "FIRST_CHANGE#", "NEXT_CHANGE#", "COMPLETION_TIME", "THREAD#"} for _, msg := range resp.GetMsg() { row := make(map[string]string) if err := json.Unmarshal([]byte(msg), &row); err == nil { vals := make([]string, len(keys)) for i, key := range keys { v, ok := row[key] if !ok { klog.Errorf("cannot find %s from view %+v", key, row) } vals[i] = v } startTime, err := time.Parse(time.RFC3339, vals[3]) if err != nil { klog.Error(err, "failed to parse the start time") continue } nextTime, err := time.Parse(time.RFC3339, vals[4]) if err != nil { klog.Error(err, "failed to parse the end time") continue } if metadata.KeyToLogEntry == nil { metadata.KeyToLogEntry = make(map[string]LogMetadataEntry) } key := fmt.Sprintf("%s-%s-%s", vals[8], vals[0], vals[1]) if existingEntry, ok := metadata.KeyToLogEntry[key]; ok { if existingEntry.ReplicaPath != "" { // already included in metadata continue } } // vals "INCARNATION#", "SEQUENCE#", "NAME", "FIRST_TIME", "NEXT_TIME", "FIRST_CHANGE#", "NEXT_CHANGE#", "COMPLETION_TIME", "THREAD#" log := LogMetadataEntry{ Incarnation: vals[0], Sequence: vals[1], SrcPath: vals[2], FirstTime: startTime, NextTime: nextTime, FirstChange: vals[5], NextChange: vals[6], CompletionTime: vals[7], Thread: vals[8], } hashEntry := LogHashEntry{} hashStore.Lock() if err := hashStore.Read(ctx, vals[2], &hashEntry); err == nil { log.LogHashEntry = hashEntry } hashStore.UnLock() metadata.KeyToLogEntry[key] = log } } return metadataStore.write(ctx, MetadataStorePath, metadata) } // RunLogRetention starts redo logs cleanup. func RunLogRetention(ctx context.Context, retentionDays int, metadataStore *SimpleStore, hashStore *SimpleStore) error { ticker := time.NewTicker(cleanupInterval) defer ticker.Stop() for { select { case <-ctx.Done(): klog.Info("Exiting log retention go routine as context canceled/timeout...") return nil case <-ticker.C: c, err := newGcsClient(ctx) if err != nil { klog.ErrorS(err, "failed to create a GCS client") continue } if err := cleanUpLogs(ctx, c, retentionDays, metadataStore, hashStore); err != nil { klog.ErrorS(err, "failed to clean up expired logs") } if err := c.close(ctx); err != nil { klog.ErrorS(err, "failed to close a GCS client") } } } } func cleanUpLogs(ctx context.Context, c storageClient, retentionDays int, metadataStore *SimpleStore, hashStore *SimpleStore) error { // read and update metadata catalog metadataStore.Lock() defer metadataStore.UnLock() metadata := &LogMetadata{} if err := metadataStore.Read(ctx, MetadataStorePath, metadata); err != nil { return err } newMetadata := &LogMetadata{} newMetadata.KeyToLogEntry = make(map[string]LogMetadataEntry) e := timeNow().AddDate(0, 0, -retentionDays) // round expire time // For a timestamp day=X hour=[0 - 24), code clean up logs before day=(X-retention) // expire time is day=(X-retention) hour=0 expire := time.Date(e.Year(), e.Month(), e.Day(), 0, 0, 0, 0, e.Location()) klog.Info(fmt.Sprintf("Cleaning up logs, Logs with next time before expiration time %v will be deleted.", expire)) for k, v := range metadata.KeyToLogEntry { deleted := false if expire.After(v.NextTime) { if err := deleteLog(ctx, v, expire, c, hashStore); err != nil { klog.Error(err, "failed to delete the expired log") } else { deleted = true } } if !deleted { newMetadata.KeyToLogEntry[k] = v } } return metadataStore.write(ctx, MetadataStorePath, newMetadata) } func deleteLog(ctx context.Context, entry LogMetadataEntry, expire time.Time, c storageClient, hashStore *SimpleStore) error { // Precautious to not delete unexpected logs, double check date in the dir. // gs://test/mydb/MYDB_USCENTRAL1A/archivelog/2021_09_13/o1_mf_1_1_jmz1gbon_.arc returns 2021_09_13. dateDir := filepath.Base(filepath.Dir(entry.ReplicaPath)) layout := "2006_01_02" tFromDir, err := time.Parse(layout, dateDir) if err != nil { return fmt.Errorf("failed to parse time from dir %v: %v", dateDir, err) } // only delete logs before expire date if !tFromDir.Before(expire) { return fmt.Errorf("skip deletion, date in the path(%q) not expired", entry.ReplicaPath) } klog.Info(fmt.Sprintf("Cleaning up an expired log %+v", entry)) if err := c.delete(ctx, entry.ReplicaPath, true); err != nil { return fmt.Errorf("failed to delete %v: %v", entry.ReplicaPath, err) } hashStore.Lock() defer hashStore.UnLock() return hashStore.delete(ctx, entry.SrcPath) } // Merge reads all metadata and merge time ranges covered by replicated redo logs. func Merge(metadata LogMetadata) [][]string { if len(metadata.KeyToLogEntry) == 0 { return nil } keys := make([]string, len(metadata.KeyToLogEntry)) i := 0 for k := range metadata.KeyToLogEntry { keys[i] = k i += 1 } sort.Slice(keys, func(i, j int) bool { return metadata.KeyToLogEntry[keys[i]].FirstTime.Before(metadata.KeyToLogEntry[keys[j]].FirstTime) }) kToEntry := metadata.KeyToLogEntry var windows [][]string var currStartKey string var currEndKey string for _, k := range keys { if kToEntry[k].ReplicaPath == "" { // not replicated continue } if currStartKey == "" { // start a new range currStartKey = k currEndKey = k continue } // TODO can we assume the end time of previous log must equal to the first time of next log ? if kToEntry[currEndKey].NextTime.Equal(kToEntry[k].FirstTime) { // merge the range currEndKey = k } else { windows = append(windows, []string{currStartKey, currEndKey}) currStartKey = k currEndKey = k } } if currStartKey != "" { windows = append(windows, []string{currStartKey, currEndKey}) } return windows } // StageLogs copies redo logs from src dir to dest dir. func StageLogs(ctx context.Context, destDir string, include func(entry LogMetadataEntry) bool, logPath string) error { metadataStore, err := NewSimpleStore(ctx, logPath) if err != nil { return fmt.Errorf("failed to create a metadata store %v", err) } metadata := LogMetadata{} if err := metadataStore.Read(ctx, MetadataStorePath, &metadata); err != nil { return fmt.Errorf("failed to read metadata: %v", err) } n := len(metadata.KeyToLogEntry) if n == 0 { return fmt.Errorf("empty metadata: %v", metadata) } var toStage []LogMetadataEntry var notReplicated []LogMetadataEntry for _, v := range metadata.KeyToLogEntry { if include(v) { if v.ReplicaPath != "" { toStage = append(toStage, v) } else { notReplicated = append(notReplicated, v) } } } if len(notReplicated) > 0 { return fmt.Errorf("cannot find redo logs in replica location %+v", notReplicated) } if len(toStage) == 0 { klog.InfoS("no logs need to be staged") return nil } destClient := &fsClient{} var srcClient storageClient if strings.HasPrefix(toStage[0].ReplicaPath, gsPrefix) { c, err := newGcsClient(ctx) if err != nil { return err } srcClient = c } else { srcClient = &fsClient{} } defer func() { srcClient.close(ctx) destClient.close(ctx) }() if err := srcClient.mkdirp(ctx, destDir, 0750); err != nil { return fmt.Errorf("failed to create the stage dir: %v", err) } toReplicate := make(chan srcDest) group := newReplicationGroup(toReplicate, srcClient, destClient) group.runCopy(ctx, replicationThreadCount) for _, ts := range toStage { toReplicate <- srcDest{ src: ts.ReplicaPath, dest: filepath.Join(destDir, filepath.Base(ts.SrcPath)), } } // stop group goroutines close(toReplicate) group.wait() if group.errCount > 0 { return fmt.Errorf("stage completed with errors error count: %d", group.errCount) } klog.Info("stage successfully completed") return nil }