lib/process/process.go (642 lines of code) (raw):

// Copyright 2019 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package process is for background processes and listed at the ../processes endpoint. package process import ( "context" "fmt" "math/rand" "sort" "strings" "sync" "time" glog "github.com/golang/glog" /* copybara-comment */ tspb "github.com/golang/protobuf/ptypes/timestamp" /* copybara-comment */ "github.com/golang/protobuf/proto" /* copybara-comment */ "github.com/golang/protobuf/ptypes" /* copybara-comment */ "github.com/pborman/uuid" /* copybara-comment */ "github.com/GoogleCloudPlatform/healthcare-federated-access-services/lib/storage" /* copybara-comment: storage */ pb "github.com/GoogleCloudPlatform/healthcare-federated-access-services/proto/process/v1" /* copybara-comment: go_proto */ ) var ( instanceID = uuid.New() ) // ErrorAction indicates how an AddError or AddWorkError should be handled. type ErrorAction string // Progress indicates how an update was handled. type Progress string const ( maxWorkErrors = 10 maxTotalErrors = 25 minJitter = 5 maxJitter = 30 // Continue indicates the error was within max error tolerance. Continue ErrorAction = "Continue" // Abort indicates this error exceeds max error tolerance. Abort ErrorAction = "Abort" // Completed indicates that execution has terminated normally due to completion of work. Completed Progress = "Completed" // Updated indicates that the state was updated in storage. Updated Progress = "Updated" // Merged indicates that the state was merged, then updated in storage. Merged Progress = "Merged" // Aborted indicates that errors caused execution to prematurely stop (incomplete). Aborted Progress = "Aborted" // Conflict indicates that the state ownership was taken over by another instance. // Unlike Aborted, the Conflict level indicates that any further writes of state // to storage should not be attempted. Conflict Progress = "Conflict" // None indicates that there was no storage update at this time. None Progress = "None" ) // Worker represents a process that perform work on the set of work items provided. type Worker interface { // ProcessActiveWork has a worker perform the work needed to process an active work item. ProcessActiveWork(ctx context.Context, state *pb.Process, workName string, work *pb.Process_Work, process *Process) error // CleanupWork has a worker perform the work needed to clean up a work item that was active previously. CleanupWork(ctx context.Context, state *pb.Process, workName string, process *Process) error // Wait indicates that the worker should wait for the next active cycle to begin. Return false to exit worker. Wait(ctx context.Context, duration time.Duration) bool } // Process is a background process that performs work at a scheduled frequency. type Process struct { name string worker Worker store storage.Store mutex sync.Mutex initialWaitDuration time.Duration minScheduleFrequency time.Duration scheduleFrequency time.Duration progressFrequency time.Duration defaultSettings *pb.Process_Params running bool } // NewProcess creates a new process to perform work of a given name. It will trigger every "scheduleFrequency" // and workers will report back status updates to the storage layer every "progressFrequency". // - If the process is not found in the storage layer, it will initialize with "defaultSettings". // - scheduleFrequency may be adjusted to meet schedule frequency constraints. func NewProcess(name string, worker Worker, store storage.Store, scheduleFrequency time.Duration, defaultSettings *pb.Process_Params) *Process { rand.Seed(time.Now().UTC().UnixNano()) p := &Process{ name: name, worker: worker, store: store, mutex: sync.Mutex{}, initialWaitDuration: time.Minute, minScheduleFrequency: 15 * time.Minute, defaultSettings: defaultSettings, running: false, } sf, pf := p.frequency(scheduleFrequency) p.scheduleFrequency = sf p.progressFrequency = pf return p } // ScheduleFrequency returns schedule frequency. func (p *Process) ScheduleFrequency() time.Duration { return p.scheduleFrequency } // DefaultSettings returns the default settings. func (p *Process) DefaultSettings() *pb.Process_Params { return p.defaultSettings } // RegisterWork adds a work item to the state for workers to process. func (p *Process) RegisterWork(workName string, workParams *pb.Process_Params, tx storage.Tx) (_ *pb.Process_Work, ferr error) { if len(workName) == 0 { return nil, fmt.Errorf("process work item registration: cannot register an empty work item") } tx = p.store.LockTx(p.name, 0, tx) if tx == nil { return nil, fmt.Errorf("lock process registration failed: lock unavailable") } defer func() { err := tx.Finish() if ferr == nil { ferr = err } }() state, err := p.readState(tx) if err != nil { return nil, err } now := ptypes.TimestampNow() if workParams == nil { workParams = &pb.Process_Params{} } work := &pb.Process_Work{ Modified: now, Params: workParams, Status: newStatus(pb.Process_Status_NEW), } old, ok := state.ActiveWork[workName] if ok && proto.Equal(old.Params, work.Params) { glog.Infof("process %q instance %q verified work item %q was already registered with the same parameters", p.name, instanceID, workName) return work, nil } state.ActiveWork[workName] = work delete(state.CleanupWork, workName) delete(state.DroppedWork, workName) state.SettingsTime = ptypes.TimestampNow() if err := p.writeState(state, tx); err != nil { return nil, err } glog.Infof("process %q instance %q registered work item %q settings: %+v", p.name, instanceID, workName, work.Params) return work, nil } // UnregisterWork (eventually) removes a work item from the active state, and allows cleanup work to be performed. func (p *Process) UnregisterWork(workName string, tx storage.Tx) (ferr error) { tx = p.store.LockTx(p.name, 0, tx) if tx == nil { return fmt.Errorf("lock process registration failed: lock unavailable") } defer func() { err := tx.Finish() if ferr == nil { ferr = err } }() state, err := p.readState(tx) if err != nil { return err } _, dropped := state.DroppedWork[workName] _, cleanup := state.CleanupWork[workName] if dropped || cleanup { return nil } // Schedule for cleanup. delete(state.ActiveWork, workName) state.CleanupWork[workName] = ptypes.TimestampNow() state.SettingsTime = ptypes.TimestampNow() if err := p.writeState(state, tx); err != nil { return err } glog.Infof("process %s instance %q scheduling: work item %q scheduled for clean up", p.name, instanceID, workName) return nil } // UpdateSettings alters resource management settings. func (p *Process) UpdateSettings(scheduleFrequency time.Duration, settings *pb.Process_Params, tx storage.Tx) (ferr error) { p.defaultSettings = settings tx = p.store.LockTx(p.name, 0, tx) if tx == nil { return fmt.Errorf("lock process to update settings failed: lock unavailable") } defer func() { err := tx.Finish() if ferr == nil { ferr = err } }() state, err := p.readState(tx) if err != nil { return err } state.Settings = settings state.SettingsTime = ptypes.TimestampNow() if scheduleFrequency > 0 { scheduleFrequency, progressFrequency := p.frequency(scheduleFrequency) state.ScheduleFrequency = ptypes.DurationProto(scheduleFrequency) p.mutex.Lock() p.scheduleFrequency = scheduleFrequency p.progressFrequency = progressFrequency p.mutex.Unlock() } if err := p.writeState(state, tx); err != nil { return err } return nil } // UpdateFlowControl alters settings for how flow of processing is managed. These are // advanced settings and should be carefully managed when used outside of tests. These // should be based on the size of the processing work between updates and the expected // total time for each run with sufficient tolerance for errors and retries to minimize // collisions with 2+ workers grabbing control of the state. func (p *Process) UpdateFlowControl(initialWaitDuration, minScheduleFrequency, progressFrequency time.Duration) error { if p.running { return fmt.Errorf("UpdateFlowControl failed: background process is already running") } p.initialWaitDuration = initialWaitDuration p.minScheduleFrequency = minScheduleFrequency p.scheduleFrequency = minScheduleFrequency p.progressFrequency = progressFrequency return nil } // Run schedules a background process. Typically this will be on its own go routine. func (p *Process) Run(ctx context.Context) { p.running = true freq := p.initialWaitDuration for { if !p.worker.Wait(ctx, p.sleepTime(freq)) { break } state, newfreq, err := p.start() if newfreq > 0 && freq != newfreq { freq = newfreq glog.Infof("process %q instance %q schedule frequency set to %q", p.name, instanceID, freq) } if state == nil || err != nil { continue } completion := pb.Process_Status_COMPLETED result, err := p.work(ctx, state) if err != nil && len(state.ProcessStatus.Errors) > 0 { glog.Infof("process %q instance %q errors during execution: %d total errors, exit error: %v, first processing error: %v", p.name, instanceID, state.ProcessStatus.TotalErrors, err, state.ProcessStatus.Errors[0]) completion = pb.Process_Status_INCOMPLETE } // finish() will do final state bookkeeping before writing it to storage. // If we are in the Conflict state, we should not attempt to write at all. if result != Conflict { p.finish(state, completion) } glog.Infof("process %q instance %q completion: status=%q, %v", p.name, instanceID, result, statsToString(state.ProcessStatus.Stats)) } glog.Infof("process %q instance %q instructed to exit", p.name, instanceID) } // Progress is called by workers every 1 or more units of work and may update the underlying state. // Returns true if an update occured. // Important note: take caution as maps may have been merged with data from storage layer. If so, Merged progress will be returned. func (p *Process) Progress(state *pb.Process) (Progress, error) { now := time.Now() progressTime, err := ptypes.Timestamp(state.ProcessStatus.ProgressTime) if err != nil { state.ProcessStatus.ProgressTime = state.ProcessStatus.StartTime progressTime = time.Unix(0, 0) } p.mutex.Lock() cutoff := progressTime.Add(p.progressFrequency) p.mutex.Unlock() if now.Sub(cutoff) > 0 { return p.update(state) } return None, nil } // AddError will add error state to a given status block. Set "workStatus" to nil if // it is not specific. func (p *Process) AddError(err error, workStatus *pb.Process_Status, state *pb.Process) ErrorAction { now := ptypes.TimestampNow() action := Continue if workStatus != nil { workStatus.TotalErrors++ workStatus.LastErrorTime = now if len(workStatus.Errors) < maxWorkErrors { workStatus.Errors = append(workStatus.Errors, &pb.Process_Error{Time: now, Text: err.Error()}) } else { action = Abort } } state.ProcessStatus.TotalErrors++ state.ProcessStatus.LastErrorTime = now if len(state.ProcessStatus.Errors) < maxTotalErrors { state.ProcessStatus.Errors = append(state.ProcessStatus.Errors, &pb.Process_Error{Time: now, Text: err.Error()}) } else { action = Abort } return action } // AddWorkError will add error state to a given work item status block as well as the process status block. func (p *Process) AddWorkError(err error, workName string, state *pb.Process) ErrorAction { work, ok := state.ActiveWork[workName] if ok { return p.AddError(err, work.Status, state) } return p.AddError(err, nil, state) } // AddStats will increment metrics of a given name within the process status. func (p *Process) AddStats(count float64, name string, state *pb.Process) { val, ok := state.ProcessStatus.Stats[name] if !ok { val = 0 } state.ProcessStatus.Stats[name] = val + count } // AddWorkStats will increment metrics of a given name within the work item and process status. func (p *Process) AddWorkStats(count float64, stat, workName string, state *pb.Process) { work, ok := state.ActiveWork[workName] if ok { work.Status.Stats[stat] = work.Status.Stats[stat] + count } p.AddStats(count, "work."+stat, state) } func (p *Process) work(ctx context.Context, state *pb.Process) (Progress, error) { // Create stable lists that will be followed even if a merge occurs during // any Progress() updates. var active []string var cleanup []string var drop []string for work := range state.ActiveWork { active = append(active, work) } for work := range state.CleanupWork { cleanup = append(cleanup, work) } // Process in a consistent order makes progress reports easier to compare. sort.Strings(active) sort.Strings(cleanup) sort.Strings(drop) // Process active work. for _, workName := range active { work, ok := state.ActiveWork[workName] if !ok { // Was removed on merge. continue } p.AddStats(1, "workItems", state) work.Status = newStatus(pb.Process_Status_ACTIVE) err := p.worker.ProcessActiveWork(ctx, state, workName, work, p) if err == nil { p.setWorkState(pb.Process_Status_COMPLETED, workName, state) } else if p.AddWorkError(err, workName, state) == Abort { p.setWorkState(pb.Process_Status_ABORTED, workName, state) return Aborted, err } else { p.setWorkState(pb.Process_Status_INCOMPLETE, workName, state) } progress, err := p.Progress(state) if progress == Conflict || progress == Aborted { return progress, err } } // Process cleanup work. for _, workName := range cleanup { if _, ok := state.CleanupWork[workName]; !ok { // Was removed on merge. continue } errors := 0 run := Continue err := p.worker.CleanupWork(ctx, state, workName, p) if err != nil && !ignoreCleanupError(err) { errors++ err = fmt.Errorf("clean up work on item %q: %v", workName, err) run = p.AddError(err, nil, state) } if run == Abort { p.AddStats(1, "workItemsDirty", state) p.AddStats(1, "workItemsAborted", state) return Aborted, err } if errors == 0 { p.AddStats(1, "workItemsCleaned", state) if _, ok := state.ActiveWork[workName]; !ok { // Only add to the drop list because there were no errors to retry later and merge has not returned the work item to the active list. drop = append(drop, workName) } } else { p.AddStats(1, "workItemsDirty", state) } progress, err := p.Progress(state) if progress == Conflict || progress == Aborted { return progress, err } } // Move cleanup work to dropped work if no errors encountered during cleaning (i.e. it is on the drop list). now := ptypes.TimestampNow() for _, workName := range drop { delete(state.CleanupWork, workName) if _, ok := state.ActiveWork[workName]; ok { // Was added on merge, do not drop. continue } state.DroppedWork[workName] = now } return Completed, nil } func (p *Process) setWorkState(statusState pb.Process_Status_State, workName string, state *pb.Process) { work, ok := state.ActiveWork[workName] if !ok { return } work.Status.State = statusState if statusState == pb.Process_Status_COMPLETED { work.Status.FinishTime = ptypes.TimestampNow() } } func (p *Process) readState(tx storage.Tx) (*pb.Process, error) { state := &pb.Process{} err := p.store.ReadTx(storage.ProcessDataType, storage.DefaultRealm, storage.DefaultUser, p.name, storage.LatestRev, state, tx) p.setup(state) if err == nil || !storage.ErrNotFound(err) { return state, err } return state, nil } func (p *Process) writeState(state *pb.Process, tx storage.Tx) error { if err := p.store.WriteTx(storage.ProcessDataType, storage.DefaultRealm, storage.DefaultUser, p.name, storage.LatestRev, state, nil, tx); err != nil { err = fmt.Errorf("process %q instance %q write state failed: %v", p.name, instanceID, err) glog.Errorf(err.Error()) return err } return nil } func (p *Process) start() (_ *pb.Process, _ time.Duration, ferr error) { tx := p.store.LockTx(p.name, 0, nil) if tx == nil { return nil, 0, nil } defer func() { err := tx.Finish() if ferr == nil { ferr = err } }() state, err := p.readState(tx) if err != nil { return nil, 0, err } // Always call setup() to add any structures that may not already be defined within the object. p.setup(state) freq, err := ptypes.Duration(state.ScheduleFrequency) if err != nil { freq = 0 } // Determine if a run is needed given the cutoff time (i.e. time of next scheduled run) // and other timestamps that determine the current run state. cutoff := p.cutoff(state) if timeCompare(state.ProcessStatus.ProgressTime, cutoff) >= 0 { if state.ProcessStatus.FinishTime == nil || timeCompare(state.ProcessStatus.StartTime, state.SettingsTime) > 0 { // Do not process for one of the following reasons: // 1. Another working already has been active recently and is likely still active. // 2. The previous worker had started more recently than when the settings has changed. return nil, freq, nil } } // This worker will process this scheduled slot. Prepare to run. state.ProcessName = p.name state.Instance = instanceID // Set up a new process status object to track this worker run. state.ProcessStatus = newStatus(pb.Process_Status_ACTIVE) // Save the current state to inform other workers that this worker owns processing for this scheduled run. if err := p.writeState(state, tx); err != nil { return nil, freq, err } glog.Infof("background process %q instance %q active...", p.name, instanceID) // Returning will release the lock, and allow other workers to check the current state. return state, freq, nil } func (p *Process) update(state *pb.Process) (_ Progress, ferr error) { state.ProcessStatus.ProgressTime = state.ProcessStatus.FinishTime if state.ProcessStatus.ProgressTime == nil { state.ProcessStatus.ProgressTime = ptypes.TimestampNow() } tx := p.store.LockTx(p.name, 0, nil) if tx == nil { err := fmt.Errorf("process %q instance %q lock unavailable", p.name, instanceID) glog.Infof(err.Error()) p.AddError(err, nil, state) return None, err } defer func() { err := tx.Finish() if ferr == nil { ferr = err } }() storeState := &pb.Process{} if err := p.store.ReadTx(storage.ProcessDataType, storage.DefaultRealm, storage.DefaultUser, p.name, storage.LatestRev, storeState, tx); err != nil { err = fmt.Errorf("reading process state %q: %v", p.name, err) glog.Infof(err.Error()) p.AddError(err, nil, state) return None, err } // Check to see if this process instance still owns the state. if storeState.Instance != instanceID { // Another process has taken over. Abandon this run. err := fmt.Errorf("process %q instance %q lost state ownership: state now owned by instance %q", p.name, instanceID, storeState.Instance) p.AddError(err, nil, state) return Conflict, err } progress := Updated if timeCompare(storeState.SettingsTime, state.ProcessStatus.StartTime) > 0 { progress = p.mergeProcessState(state, storeState) } if err := p.writeState(state, tx); err != nil { p.AddError(err, nil, state) return progress, err } return progress, nil } func (p *Process) finish(state *pb.Process, completion pb.Process_Status_State) { state.ProcessStatus.FinishTime = ptypes.TimestampNow() state.ProcessStatus.State = completion if _, ok := state.ProcessStatus.Stats["workItems"]; !ok { p.AddStats(0, "workItems", state) } aggregateStats(state) p.update(state) } func (p *Process) mergeProcessState(state, src *pb.Process) Progress { var rm []string now := ptypes.TimestampNow() // ActiveWork: take params etc from src, but retain some processing state. // Remove from ActiveWork if work item is not in src. for k, destv := range state.ActiveWork { if srcp, ok := src.ActiveWork[k]; ok { srcp.Status = destv.Status state.ActiveWork[k] = srcp } else { rm = append(rm, k) } } for _, k := range rm { delete(state.ActiveWork, k) if _, ok := state.CleanupWork[k]; !ok { state.CleanupWork[k] = now } } // Copy over active work items from src that are not currently in processing state. for k, srcv := range src.ActiveWork { if _, ok := state.ActiveWork[k]; !ok { state.ActiveWork[k] = srcv } } // CleanupWork: add all from src. for k, v := range src.CleanupWork { state.CleanupWork[k] = v if _, ok := state.DroppedWork[k]; ok { delete(state.DroppedWork, k) } if _, ok := state.ActiveWork[k]; ok { delete(state.CleanupWork, k) } } // DroppedWork: will only have changed in some error states, add from src // if not on other lists. Timestamp of when dropped is not critical. for k, v := range src.DroppedWork { _, active := state.ActiveWork[k] _, clean := state.CleanupWork[k] _, drop := state.DroppedWork[k] if !active && !clean && !drop { state.CleanupWork[k] = v } } rm = []string{} for k := range state.DroppedWork { _, active := state.ActiveWork[k] _, clean := state.CleanupWork[k] if active || clean { rm = append(rm, k) } } for _, work := range rm { delete(state.DroppedWork, work) } // Keep ProcessName, Instance, ProcessStatus, and AggregateStats. // Take remaining items from src. state.ScheduleFrequency = src.ScheduleFrequency state.Settings = src.Settings state.SettingsTime = now // reflect this merge return Merged } func newStatus(statusState pb.Process_Status_State) *pb.Process_Status { now := ptypes.TimestampNow() return &pb.Process_Status{ StartTime: now, ProgressTime: now, Stats: map[string]float64{}, Errors: []*pb.Process_Error{}, State: statusState, } } func timeCompare(a, b *tspb.Timestamp) float64 { at, err := ptypes.Timestamp(a) if err != nil { at = time.Unix(0, 0) } bt, err := ptypes.Timestamp(b) if err != nil { bt = time.Unix(0, 0) } return at.Sub(bt).Seconds() } func (p *Process) cutoff(state *pb.Process) *tspb.Timestamp { cutoff := int64(0) now := time.Now().Unix() d, err := ptypes.Duration(state.ScheduleFrequency) if err != nil { d = time.Hour } freq := int64(d.Seconds()) if freq < 1 { freq = 1 } c := int64(now/freq) * freq if cutoff == 0 || c < cutoff { cutoff = c } ts, err := ptypes.TimestampProto(time.Unix(cutoff, 0)) if err != nil { return nil } return ts } func (p *Process) setup(state *pb.Process) { if state.ActiveWork == nil { state.ActiveWork = make(map[string]*pb.Process_Work) } if state.CleanupWork == nil { state.CleanupWork = make(map[string]*tspb.Timestamp) } if state.DroppedWork == nil { state.DroppedWork = make(map[string]*tspb.Timestamp) } if state.ProcessStatus == nil { state.ProcessStatus = &pb.Process_Status{ Stats: make(map[string]float64), } } if state.AggregateStats == nil { state.AggregateStats = make(map[string]float64) } state.ProcessName = p.name freq, err := ptypes.Duration(state.ScheduleFrequency) if err != nil || freq == 0 { p.mutex.Lock() state.ScheduleFrequency = ptypes.DurationProto(p.scheduleFrequency) p.mutex.Unlock() } if state.Settings == nil { state.Settings = p.defaultSettings state.SettingsTime = ptypes.TimestampNow() } } func (p *Process) sleepTime(freq time.Duration) time.Duration { secs := freq.Seconds() if secs < 1 { return freq } p.mutex.Lock() defer p.mutex.Unlock() // Calculate the duration until the next start cycle should begin, then add some jitter. now := time.Now().Unix() next := (int64(now/int64(secs)) + 1) * int64(secs) secs = float64(next - now) // Add a small amount of random jitter to avoid some lock contention. jitterSeconds := minJitter + rand.Float64()*(maxJitter-minJitter) ns := (secs + jitterSeconds) * 1e9 return time.Duration(ns) } func aggregateStats(state *pb.Process) { src := state.ProcessStatus.Stats dest := state.AggregateStats src["errors"] = float64(state.ProcessStatus.TotalErrors) src["duration"] = timeCompare(state.ProcessStatus.FinishTime, state.ProcessStatus.StartTime) src["runs"] = 1 src["state."+strings.ToLower(state.ProcessStatus.State.String())] = 1 for k, v := range src { prev, ok := dest[k] if !ok { prev = 0 } dest[k] = prev + v } } func (p *Process) frequency(scheduleFrequency time.Duration) (time.Duration, time.Duration) { // Adjust processFrequency and progressFrequency such that: // 1. Workers do not fire too often, causing timing errors. // 2. Progress occurs frequently enough that lock ownership remains in place with occasional update() errors. if scheduleFrequency < p.minScheduleFrequency { scheduleFrequency = p.minScheduleFrequency } maxProgressFrequency := p.minScheduleFrequency / 3 progressFrequency := scheduleFrequency / 10 if progressFrequency > maxProgressFrequency { progressFrequency = maxProgressFrequency } return scheduleFrequency, progressFrequency } func statsToString(stats map[string]float64) string { out := "" for k, v := range stats { if len(out) > 0 { out += ", " } out += fmt.Sprintf("%s=%g", k, v) if k == "duration" { out += "s" // tag the units as seconds } } return out } // TODO: use new status errors and detect this better func ignoreCleanupError(err error) bool { text := err.Error() return strings.Contains(text, "Error 403") || strings.Contains(text, "Error 404") }