in util/queue/persist.go [44:160]
func FixPersistent(logr logrus.FieldLogger, client PersistClient, path gcs.Path, tick <-chan time.Time) Fixer {
var shouldSave bool
log := logr.WithField("path", path)
return func(parentCtx context.Context, q *Queue) error {
log.Debug("Using persistent state")
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()
go func() { // allow a grace period for reads/writes
select {
case <-ctx.Done():
return
case <-parentCtx.Done():
timer := time.NewTimer(5 * time.Second)
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
case <-timer.C:
cancel()
return
}
}
}()
tryLoad := func() error {
reader, attrs, err := client.Open(ctx, path)
if errors.Is(err, storage.ErrObjectNotExist) {
log.Info("Previous persistent queue state does not exist.")
return nil
}
if err != nil {
return fmt.Errorf("open: %w", err)
}
defer reader.Close()
dec := json.NewDecoder(reader)
var whens map[string]time.Time
if err := dec.Decode(&whens); err != nil {
return fmt.Errorf("decode: %v", err)
}
current := q.Current()
for name := range whens {
if _, ok := current[name]; ok {
continue
}
delete(whens, name)
}
log.WithField("from", attrs.LastModified).Info("Loaded previous state, syncing queue.")
if err := q.FixAll(whens, false); err != nil {
return fmt.Errorf("fix all: %v", err)
}
return nil
}
logSave := true
trySave := func() error {
currently := q.Current()
buf, err := json.MarshalIndent(currently, "", " ")
if err != nil {
return fmt.Errorf("marshal: %v", err)
}
attrs, err := client.Upload(ctx, path, buf, gcs.DefaultACL, gcs.NoCache)
if err == nil && logSave {
logSave = false
log.WithField("updated", attrs.Updated).Info("Wrote persistent state")
}
return err
}
for {
select {
case <-parentCtx.Done():
log.Debug("Stopped syncing persistent state")
return parentCtx.Err()
case <-tick:
}
if shouldSave {
log.Trace("Saving persistent state...")
if err := trySave(); err != nil {
log := log.WithError(err)
var chirp func(...interface{})
if errors.Is(err, context.Canceled) {
chirp = log.Debug
} else {
chirp = log.Warning
}
chirp("Failed to save persistent state.")
continue
}
log.Debug("Saved persistent state.")
} else {
log.Trace("Loading persistent state...")
if err := tryLoad(); err != nil {
log := log.WithError(err)
var chirp func(...interface{})
if errors.Is(err, context.Canceled) {
chirp = log.Debug
} else {
chirp = log.Warning
}
chirp("Failed to load persistent state.")
continue
}
shouldSave = true
log.Debug("Loaded persistent state.")
}
}
}
}