in lib/process/process.go [366:452]
func (p *Process) work(ctx context.Context, state *pb.Process) (Progress, error) {
// Create stable lists that will be followed even if a merge occurs during
// any Progress() updates.
var active []string
var cleanup []string
var drop []string
for work := range state.ActiveWork {
active = append(active, work)
}
for work := range state.CleanupWork {
cleanup = append(cleanup, work)
}
// Process in a consistent order makes progress reports easier to compare.
sort.Strings(active)
sort.Strings(cleanup)
sort.Strings(drop)
// Process active work.
for _, workName := range active {
work, ok := state.ActiveWork[workName]
if !ok {
// Was removed on merge.
continue
}
p.AddStats(1, "workItems", state)
work.Status = newStatus(pb.Process_Status_ACTIVE)
err := p.worker.ProcessActiveWork(ctx, state, workName, work, p)
if err == nil {
p.setWorkState(pb.Process_Status_COMPLETED, workName, state)
} else if p.AddWorkError(err, workName, state) == Abort {
p.setWorkState(pb.Process_Status_ABORTED, workName, state)
return Aborted, err
} else {
p.setWorkState(pb.Process_Status_INCOMPLETE, workName, state)
}
progress, err := p.Progress(state)
if progress == Conflict || progress == Aborted {
return progress, err
}
}
// Process cleanup work.
for _, workName := range cleanup {
if _, ok := state.CleanupWork[workName]; !ok {
// Was removed on merge.
continue
}
errors := 0
run := Continue
err := p.worker.CleanupWork(ctx, state, workName, p)
if err != nil && !ignoreCleanupError(err) {
errors++
err = fmt.Errorf("clean up work on item %q: %v", workName, err)
run = p.AddError(err, nil, state)
}
if run == Abort {
p.AddStats(1, "workItemsDirty", state)
p.AddStats(1, "workItemsAborted", state)
return Aborted, err
}
if errors == 0 {
p.AddStats(1, "workItemsCleaned", state)
if _, ok := state.ActiveWork[workName]; !ok {
// Only add to the drop list because there were no errors to retry later and merge has not returned the work item to the active list.
drop = append(drop, workName)
}
} else {
p.AddStats(1, "workItemsDirty", state)
}
progress, err := p.Progress(state)
if progress == Conflict || progress == Aborted {
return progress, err
}
}
// Move cleanup work to dropped work if no errors encountered during cleaning (i.e. it is on the drop list).
now := ptypes.TimestampNow()
for _, workName := range drop {
delete(state.CleanupWork, workName)
if _, ok := state.ActiveWork[workName]; ok {
// Was added on merge, do not drop.
continue
}
state.DroppedWork[workName] = now
}
return Completed, nil
}