func FixPersistent()

in util/queue/persist.go [44:160]


func FixPersistent(logr logrus.FieldLogger, client PersistClient, path gcs.Path, tick <-chan time.Time) Fixer {
	var shouldSave bool
	log := logr.WithField("path", path)
	return func(parentCtx context.Context, q *Queue) error {
		log.Debug("Using persistent state")

		ctx, cancel := context.WithCancel(parentCtx)
		defer cancel()
		go func() { // allow a grace period for reads/writes
			select {
			case <-ctx.Done():
				return
			case <-parentCtx.Done():
				timer := time.NewTimer(5 * time.Second)
				select {
				case <-ctx.Done():
					if !timer.Stop() {
						<-timer.C
					}
					return
				case <-timer.C:
					cancel()
					return
				}
			}
		}()

		tryLoad := func() error {
			reader, attrs, err := client.Open(ctx, path)
			if errors.Is(err, storage.ErrObjectNotExist) {
				log.Info("Previous persistent queue state does not exist.")
				return nil
			}
			if err != nil {
				return fmt.Errorf("open: %w", err)
			}

			defer reader.Close()
			dec := json.NewDecoder(reader)
			var whens map[string]time.Time
			if err := dec.Decode(&whens); err != nil {
				return fmt.Errorf("decode: %v", err)
			}

			current := q.Current()

			for name := range whens {
				if _, ok := current[name]; ok {
					continue
				}
				delete(whens, name)
			}

			log.WithField("from", attrs.LastModified).Info("Loaded previous state, syncing queue.")
			if err := q.FixAll(whens, false); err != nil {
				return fmt.Errorf("fix all: %v", err)
			}
			return nil
		}

		logSave := true

		trySave := func() error {
			currently := q.Current()
			buf, err := json.MarshalIndent(currently, "", "  ")
			if err != nil {
				return fmt.Errorf("marshal: %v", err)
			}
			attrs, err := client.Upload(ctx, path, buf, gcs.DefaultACL, gcs.NoCache)
			if err == nil && logSave {
				logSave = false
				log.WithField("updated", attrs.Updated).Info("Wrote persistent state")
			}
			return err
		}

		for {
			select {
			case <-parentCtx.Done():
				log.Debug("Stopped syncing persistent state")
				return parentCtx.Err()
			case <-tick:
			}

			if shouldSave {
				log.Trace("Saving persistent state...")
				if err := trySave(); err != nil {
					log := log.WithError(err)
					var chirp func(...interface{})
					if errors.Is(err, context.Canceled) {
						chirp = log.Debug
					} else {
						chirp = log.Warning
					}
					chirp("Failed to save persistent state.")
					continue
				}
				log.Debug("Saved persistent state.")
			} else {
				log.Trace("Loading persistent state...")
				if err := tryLoad(); err != nil {
					log := log.WithError(err)
					var chirp func(...interface{})
					if errors.Is(err, context.Canceled) {
						chirp = log.Debug
					} else {
						chirp = log.Warning
					}
					chirp("Failed to load persistent state.")
					continue
				}
				shouldSave = true
				log.Debug("Loaded persistent state.")
			}
		}
	}
}