in oracle/pkg/agents/pitr/pitr.go [849:953]
func metadataUpdate(ctx context.Context, dbdClient dbdpb.DatabaseDaemonClient, hashStore *SimpleStore, metadataStore *SimpleStore) error {
archiveDir, err := getArchivedLogDir(ctx, dbdClient)
if err != nil {
return err
}
// TODO based on retention/ keep track the last success status update timestamp,
// we can select log COMPLETION_TIME >= NOW - RETENTION to reduce the size of result.
// Assume El Carro instance date is in UTC
query := "select " +
"v$archived_log.NAME, " +
"v$archived_log.FIRST_CHANGE#, " +
"TO_CHAR(v$archived_log.FIRST_TIME, 'YYYY-MM-DD\"T\"HH24:MI:SS\"Z\"') as FIRST_TIME, " +
"v$archived_log.NEXT_CHANGE#, " +
"TO_CHAR(v$archived_log.NEXT_TIME, 'YYYY-MM-DD\"T\"HH24:MI:SS\"Z\"') as NEXT_TIME, " +
"v$archived_log.SEQUENCE#, " +
"TO_CHAR(v$archived_log.COMPLETION_TIME, 'YYYY-MM-DD\"T\"HH24:MI:SS\"Z00:00\"') as COMPLETION_TIME, " +
"v$database_incarnation.INCARNATION#, " +
"v$archived_log.THREAD# " +
"from v$archived_log left join v$database_incarnation on v$archived_log.RESETLOGS_ID=v$database_incarnation.RESETLOGS_ID " +
"where v$archived_log.COMPLETION_TIME >= (SYSDATE - 30) AND v$archived_log.NAME LIKE '" + archiveDir + "%'"
resp, err := dbdClient.RunSQLPlusFormatted(ctx,
&dbdpb.RunSQLPlusCMDRequest{
Commands: []string{query},
Suppress: true,
},
)
if err != nil {
return err
}
// read and update metadata catalog
metadataStore.Lock()
defer metadataStore.UnLock()
metadata := &LogMetadata{}
// TODO better retry or implement retry in store.
for i := 0; i <= 5; i++ {
if err := metadataStore.Read(ctx, MetadataStorePath, metadata); err != nil {
klog.ErrorS(err, "failed to load metadata", "attempt", i)
} else {
break
}
}
keys := []string{"INCARNATION#", "SEQUENCE#", "NAME", "FIRST_TIME", "NEXT_TIME", "FIRST_CHANGE#", "NEXT_CHANGE#", "COMPLETION_TIME", "THREAD#"}
for _, msg := range resp.GetMsg() {
row := make(map[string]string)
if err := json.Unmarshal([]byte(msg), &row); err == nil {
vals := make([]string, len(keys))
for i, key := range keys {
v, ok := row[key]
if !ok {
klog.Errorf("cannot find %s from view %+v", key, row)
}
vals[i] = v
}
startTime, err := time.Parse(time.RFC3339, vals[3])
if err != nil {
klog.Error(err, "failed to parse the start time")
continue
}
nextTime, err := time.Parse(time.RFC3339, vals[4])
if err != nil {
klog.Error(err, "failed to parse the end time")
continue
}
if metadata.KeyToLogEntry == nil {
metadata.KeyToLogEntry = make(map[string]LogMetadataEntry)
}
key := fmt.Sprintf("%s-%s-%s", vals[8], vals[0], vals[1])
if existingEntry, ok := metadata.KeyToLogEntry[key]; ok {
if existingEntry.ReplicaPath != "" {
// already included in metadata
continue
}
}
// vals "INCARNATION#", "SEQUENCE#", "NAME", "FIRST_TIME", "NEXT_TIME", "FIRST_CHANGE#", "NEXT_CHANGE#", "COMPLETION_TIME", "THREAD#"
log := LogMetadataEntry{
Incarnation: vals[0],
Sequence: vals[1],
SrcPath: vals[2],
FirstTime: startTime,
NextTime: nextTime,
FirstChange: vals[5],
NextChange: vals[6],
CompletionTime: vals[7],
Thread: vals[8],
}
hashEntry := LogHashEntry{}
hashStore.Lock()
if err := hashStore.Read(ctx, vals[2], &hashEntry); err == nil {
log.LogHashEntry = hashEntry
}
hashStore.UnLock()
metadata.KeyToLogEntry[key] = log
}
}
return metadataStore.write(ctx, MetadataStorePath, metadata)
}