lib/lro/lro.go (280 lines of code) (raw):

// Copyright 2020 Google LLC. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package lro provides Long Running Operation (LRO) background processing. package lro import ( "context" "fmt" "time" "github.com/golang/protobuf/ptypes" /* copybara-comment */ "github.com/GoogleCloudPlatform/healthcare-federated-access-services/lib/ga4gh" /* copybara-comment: ga4gh */ "github.com/GoogleCloudPlatform/healthcare-federated-access-services/lib/storage" /* copybara-comment: storage */ processlib "github.com/GoogleCloudPlatform/healthcare-federated-access-services/lib/process" /* copybara-comment: process */ pb "github.com/GoogleCloudPlatform/healthcare-federated-access-services/proto/process/v1" /* copybara-comment: go_proto */ ) const ( // Active indicates the item is in the queue to be processed, or is being processed. Active = "active" // Inactive indicates the item has been ABORTED or COMPLETED Inactive = "inactive" keyRealm = "realm" opRealmRemoval = "remove-realm" scheduleFactor = 5 // schedule election ~5x longer than max expected progress frequency to avoid race conditions ) var ( // If this worker performed work this recently, then it is considered a candidate to wake more quickly detectMasterDuration = 10 * time.Second retainCompletionStatusDuration = 7 * 24 * time.Hour ) // LRO is an interface for long running operations. type LRO interface { AddRealmRemoval(id, realm string, identity *ga4gh.Identity, tx storage.Tx) (*pb.Process_Work, error) Remove(id string, tx storage.Tx) error Run(ctx context.Context) } // Service is a long running operation service. type Service struct { name string store storage.Store last time.Time wakeFrequency time.Duration process *processlib.Process wait func(ctx context.Context, duration time.Duration) bool } // New creates a new LRO processing routine that holds multiple LROs that share // the same setup parameters. func New(name string, wakeFrequency, maxProgress time.Duration, store storage.Store, tx storage.Tx) (*Service, error) { lro := &Service{ name: name, store: store, wakeFrequency: wakeFrequency, last: time.Now(), // this will wake on wakeFrequency to try to win the election } max := int64(maxProgress.Seconds()) schedule := max * scheduleFactor defaultParams := &pb.Process_Params{ IntParams: map[string]int64{ "maxProgressDuration": max, "scheduleFrequency": schedule, "wakeFrequency": int64(wakeFrequency.Seconds()), }, } scheduleDuration := time.Duration(schedule) * time.Second lro.process = processlib.NewProcess(name, lro, store, scheduleDuration, defaultParams) // Use advanced controls to check soon after startup and use a shorter scheduleDuration given frequent progress updates. lro.process.UpdateFlowControl(30*time.Second, scheduleDuration, maxProgress) if _, err := lro.process.RegisterWork("lro", &pb.Process_Params{IntParams: map[string]int64{}}, tx); err != nil { return nil, err } return lro, nil } // StateToString offers a human-readable label for a State enum. func StateToString(state pb.Process_Status_State) string { switch state { case pb.Process_Status_NEW: return "queued" case pb.Process_Status_ACTIVE: return "active" case pb.Process_Status_ABORTED: return "aborted" case pb.Process_Status_INCOMPLETE: return "incomplete" case pb.Process_Status_COMPLETED: return "completed" } return "unspecified" } // AddRealmRemoval adds a LRO work item for the stated goal to the state for workers to process. func (s *Service) AddRealmRemoval(id, realm string, identity *ga4gh.Identity, tx storage.Tx) (*pb.Process_Work, error) { work := createWork(opRealmRemoval, id, keyRealm, realm, fmt.Sprintf("remove realm %q", realm), identity) if err := s.store.WriteTx(storage.LongRunningOperationDatatype, storage.DefaultRealm, Active, id, storage.LatestRev, work, nil, tx); err != nil { return nil, err } return work, nil } // Remove deletes one LRO work item from the active queue or inactive list. It does not provide any cleanup if the // state is partial. Depending on execution, this deletion could later be rewritten by an active processing agent, // so deleting is a best effort. func (s *Service) Remove(id string, tx storage.Tx) error { // Silent on Not Found errors. if err := s.store.DeleteTx(storage.LongRunningOperationDatatype, storage.DefaultRealm, Active, id, storage.LatestRev, tx); err != nil && !storage.ErrNotFound(err) { return err } if err := s.store.DeleteTx(storage.LongRunningOperationDatatype, storage.DefaultRealm, Inactive, id, storage.LatestRev, tx); err != nil && !storage.ErrNotFound(err) { return err } return nil } // WaitCondition registers a callback that is called and checks conditions before every wait cycle. func (s *Service) WaitCondition(fn func(ctx context.Context, duration time.Duration) bool) { s.wait = fn } // Run schedules a background process. Typically this will be on its own go routine. func (s *Service) Run(ctx context.Context) { s.process.Run(ctx) } // ProcessActiveWork has a worker perform the work needed to process an active work item. func (s *Service) ProcessActiveWork(ctx context.Context, state *pb.Process, workName string, work *pb.Process_Work, process *processlib.Process) error { var tx storage.Tx ctx, cancel := context.WithCancel(ctx) defer cancel() // For moving active work to the inactive pile. move := make(map[string]*pb.Process_Work) var abort error for abort == nil { results, err := s.store.MultiReadTx(storage.LongRunningOperationDatatype, storage.DefaultRealm, Active, storage.MatchAllIDs, nil, 0, 25, &pb.Process_Work{}, tx) if err != nil { process.AddWorkError(err, workName, state) return err } if len(results.Entries) == 0 { break } for _, entry := range results.Entries { item, ok := entry.Item.(*pb.Process_Work) if !ok { err := fmt.Errorf("cast to process work") if process.AddWorkError(err, workName, state) != processlib.Continue { abort = err break } continue } markStarted(item) strParams := item.GetParams().GetStringParams() op := strParams["operation"] // Do not return early from here so we keep moving last forward, even when errors occur. var err error action := processlib.Continue st := item.GetStatus() switch op { // Add all supported operations to this switch. case opRealmRemoval: action, err = s.removeRealm(ctx, entry.ItemID, item, state, process) case "": st.State = pb.Process_Status_ABORTED err = fmt.Errorf("missing operation") default: st.State = pb.Process_Status_ABORTED err = fmt.Errorf("unknown operation %q", op) } if st.State == pb.Process_Status_ABORTED { move[entry.ItemID] = item } if err != nil { if process.AddWorkError(err, workName, state) != processlib.Continue { markIncomplete(item) abort = err break } } if action != processlib.Continue { markIncomplete(item) st.State = pb.Process_Status_INCOMPLETE abort = err break } markCompleted(item) move[entry.ItemID] = item process.Progress(state) } for id, item := range move { if err := s.store.WriteTx(storage.LongRunningOperationDatatype, storage.DefaultRealm, Inactive, id, storage.LatestRev, item, nil, tx); err != nil { process.AddWorkError(err, workName, state) continue } if err := s.store.DeleteTx(storage.LongRunningOperationDatatype, storage.DefaultRealm, Active, id, storage.LatestRev, tx); err != nil { process.AddWorkError(err, workName, state) } } // Always update last to show we hold the master key and should be the one to wake up early. s.last = time.Now() if abort != nil { break } } return abort } func markStarted(item *pb.Process_Work) { now := ptypes.TimestampNow() st := item.GetStatus() st.StartTime = now st.ProgressTime = now st.State = pb.Process_Status_ACTIVE lroStats(float64(1), "runs", item) } func markIncomplete(item *pb.Process_Work) { st := item.GetStatus() if st.State != pb.Process_Status_ABORTED { st.State = pb.Process_Status_INCOMPLETE } lroStats(float64(1), "state."+StateToString(st.State), item) markEnded(item) } func markCompleted(item *pb.Process_Work) { st := item.GetStatus() st.State = pb.Process_Status_COMPLETED lroStats(float64(1), "state."+StateToString(st.State), item) markEnded(item) } func markEnded(item *pb.Process_Work) { now := ptypes.TimestampNow() st := item.GetStatus() st.FinishTime = now st.ProgressTime = now if st.StartTime != nil { duration := now.AsTime().Sub(st.StartTime.AsTime()).Seconds() lroStats(duration, "duration", item) } } func lroStats(count float64, name string, item *pb.Process_Work) { st := item.GetStatus() stat := st.GetStats() if stat == nil { st.Stats = make(map[string]float64) stat = st.Stats } val, ok := stat[name] if !ok { val = 0 } stat[name] = val + count } func (s *Service) removeRealm(ctx context.Context, id string, item *pb.Process_Work, state *pb.Process, process *processlib.Process) (processlib.ErrorAction, error) { params := item.GetParams().GetStringParams() realm := params[keyRealm] if len(realm) == 0 { err := fmt.Errorf("empty realm name") process.AddError(err, item.GetStatus(), state) return processlib.Abort, err } maxEntries := 500 for i := 0; true; i++ { count, err := s.store.Wipe(ctx, realm, i, maxEntries) if err != nil && process.AddError(err, item.GetStatus(), state) != processlib.Continue { return processlib.Abort, err } // Stats for the background process that manages all LRO work. process.AddStats(float64(count), "removeRealm.itemsRemoved", state) // Stats specifically for this this LRO work object that is stored in a separate LRO object. lroStats(float64(count), "removeRealm.itemsRemoved", item) if count < maxEntries { break } process.Progress(state) } return processlib.Continue, nil } // CleanupWork has a worker perform the work needed to clean up a work item that was active previously. func (s *Service) CleanupWork(ctx context.Context, state *pb.Process, workName string, process *processlib.Process) error { return nil } // Wait indicates that the worker should wait for the next active cycle to begin. func (s *Service) Wait(ctx context.Context, schedule time.Duration) bool { if time.Now().Sub(s.last) < detectMasterDuration { // We should wait only a short while schedule = s.wakeFrequency } if s.wait != nil { if !s.wait(ctx, schedule) { return false } } else { time.Sleep(schedule) } return true } func createWork(op, id, key, value, label string, identity *ga4gh.Identity) *pb.Process_Work { work := &pb.Process_Work{ // Modified is for the settings change timestamp. Modified: ptypes.TimestampNow(), Params: &pb.Process_Params{ StringParams: map[string]string{ "id": id, "operation": op, "label": label, key: value, }, }, Status: &pb.Process_Status{ TotalErrors: 0, State: pb.Process_Status_NEW, }, } if identity != nil { work.Params.StringParams["subject"] = identity.Subject work.Params.StringParams["issuer"] = identity.Issuer work.Params.StringParams["email"] = identity.Email } return work }