func metadataUpdate()

in oracle/pkg/agents/pitr/pitr.go [849:953]


func metadataUpdate(ctx context.Context, dbdClient dbdpb.DatabaseDaemonClient, hashStore *SimpleStore, metadataStore *SimpleStore) error {
	archiveDir, err := getArchivedLogDir(ctx, dbdClient)
	if err != nil {
		return err
	}
	// TODO based on retention/ keep track the last success status update timestamp,
	// we can select log COMPLETION_TIME >= NOW - RETENTION to reduce the size of result.
	// Assume El Carro instance date is in UTC
	query := "select " +
		"v$archived_log.NAME, " +
		"v$archived_log.FIRST_CHANGE#, " +
		"TO_CHAR(v$archived_log.FIRST_TIME, 'YYYY-MM-DD\"T\"HH24:MI:SS\"Z\"') as FIRST_TIME, " +
		"v$archived_log.NEXT_CHANGE#, " +
		"TO_CHAR(v$archived_log.NEXT_TIME, 'YYYY-MM-DD\"T\"HH24:MI:SS\"Z\"') as NEXT_TIME, " +
		"v$archived_log.SEQUENCE#, " +
		"TO_CHAR(v$archived_log.COMPLETION_TIME, 'YYYY-MM-DD\"T\"HH24:MI:SS\"Z00:00\"') as COMPLETION_TIME, " +
		"v$database_incarnation.INCARNATION#, " +
		"v$archived_log.THREAD# " +
		"from v$archived_log left join v$database_incarnation on v$archived_log.RESETLOGS_ID=v$database_incarnation.RESETLOGS_ID " +
		"where v$archived_log.COMPLETION_TIME >= (SYSDATE - 30) AND  v$archived_log.NAME LIKE '" + archiveDir + "%'"
	resp, err := dbdClient.RunSQLPlusFormatted(ctx,
		&dbdpb.RunSQLPlusCMDRequest{
			Commands: []string{query},
			Suppress: true,
		},
	)

	if err != nil {
		return err
	}

	// read and update metadata catalog
	metadataStore.Lock()
	defer metadataStore.UnLock()
	metadata := &LogMetadata{}

	// TODO better retry or implement retry in store.
	for i := 0; i <= 5; i++ {
		if err := metadataStore.Read(ctx, MetadataStorePath, metadata); err != nil {
			klog.ErrorS(err, "failed to load metadata", "attempt", i)
		} else {
			break
		}
	}

	keys := []string{"INCARNATION#", "SEQUENCE#", "NAME", "FIRST_TIME", "NEXT_TIME", "FIRST_CHANGE#", "NEXT_CHANGE#", "COMPLETION_TIME", "THREAD#"}
	for _, msg := range resp.GetMsg() {
		row := make(map[string]string)
		if err := json.Unmarshal([]byte(msg), &row); err == nil {
			vals := make([]string, len(keys))
			for i, key := range keys {
				v, ok := row[key]
				if !ok {
					klog.Errorf("cannot find %s from view %+v", key, row)
				}
				vals[i] = v
			}
			startTime, err := time.Parse(time.RFC3339, vals[3])
			if err != nil {
				klog.Error(err, "failed to parse the start time")
				continue
			}
			nextTime, err := time.Parse(time.RFC3339, vals[4])
			if err != nil {
				klog.Error(err, "failed to parse the end time")
				continue
			}

			if metadata.KeyToLogEntry == nil {
				metadata.KeyToLogEntry = make(map[string]LogMetadataEntry)
			}

			key := fmt.Sprintf("%s-%s-%s", vals[8], vals[0], vals[1])

			if existingEntry, ok := metadata.KeyToLogEntry[key]; ok {
				if existingEntry.ReplicaPath != "" {
					// already included in metadata
					continue
				}
			}

			// vals "INCARNATION#", "SEQUENCE#", "NAME", "FIRST_TIME", "NEXT_TIME", "FIRST_CHANGE#", "NEXT_CHANGE#", "COMPLETION_TIME", "THREAD#"
			log := LogMetadataEntry{
				Incarnation:    vals[0],
				Sequence:       vals[1],
				SrcPath:        vals[2],
				FirstTime:      startTime,
				NextTime:       nextTime,
				FirstChange:    vals[5],
				NextChange:     vals[6],
				CompletionTime: vals[7],
				Thread:         vals[8],
			}

			hashEntry := LogHashEntry{}
			hashStore.Lock()
			if err := hashStore.Read(ctx, vals[2], &hashEntry); err == nil {
				log.LogHashEntry = hashEntry
			}
			hashStore.UnLock()
			metadata.KeyToLogEntry[key] = log
		}
	}
	return metadataStore.write(ctx, MetadataStorePath, metadata)
}