func()

in lib/process/process.go [366:452]


func (p *Process) work(ctx context.Context, state *pb.Process) (Progress, error) {
	// Create stable lists that will be followed even if a merge occurs during
	// any Progress() updates.
	var active []string
	var cleanup []string
	var drop []string
	for work := range state.ActiveWork {
		active = append(active, work)
	}
	for work := range state.CleanupWork {
		cleanup = append(cleanup, work)
	}
	// Process in a consistent order makes progress reports easier to compare.
	sort.Strings(active)
	sort.Strings(cleanup)
	sort.Strings(drop)

	// Process active work.
	for _, workName := range active {
		work, ok := state.ActiveWork[workName]
		if !ok {
			// Was removed on merge.
			continue
		}
		p.AddStats(1, "workItems", state)
		work.Status = newStatus(pb.Process_Status_ACTIVE)
		err := p.worker.ProcessActiveWork(ctx, state, workName, work, p)
		if err == nil {
			p.setWorkState(pb.Process_Status_COMPLETED, workName, state)
		} else if p.AddWorkError(err, workName, state) == Abort {
			p.setWorkState(pb.Process_Status_ABORTED, workName, state)
			return Aborted, err
		} else {
			p.setWorkState(pb.Process_Status_INCOMPLETE, workName, state)
		}
		progress, err := p.Progress(state)
		if progress == Conflict || progress == Aborted {
			return progress, err
		}
	}

	// Process cleanup work.
	for _, workName := range cleanup {
		if _, ok := state.CleanupWork[workName]; !ok {
			// Was removed on merge.
			continue
		}
		errors := 0
		run := Continue
		err := p.worker.CleanupWork(ctx, state, workName, p)
		if err != nil && !ignoreCleanupError(err) {
			errors++
			err = fmt.Errorf("clean up work on item %q: %v", workName, err)
			run = p.AddError(err, nil, state)
		}
		if run == Abort {
			p.AddStats(1, "workItemsDirty", state)
			p.AddStats(1, "workItemsAborted", state)
			return Aborted, err
		}
		if errors == 0 {
			p.AddStats(1, "workItemsCleaned", state)
			if _, ok := state.ActiveWork[workName]; !ok {
				// Only add to the drop list because there were no errors to retry later and merge has not returned the work item to the active list.
				drop = append(drop, workName)
			}
		} else {
			p.AddStats(1, "workItemsDirty", state)
		}
		progress, err := p.Progress(state)
		if progress == Conflict || progress == Aborted {
			return progress, err
		}
	}

	// Move cleanup work to dropped work if no errors encountered during cleaning (i.e. it is on the drop list).
	now := ptypes.TimestampNow()
	for _, workName := range drop {
		delete(state.CleanupWork, workName)
		if _, ok := state.ActiveWork[workName]; ok {
			// Was added on merge, do not drop.
			continue
		}
		state.DroppedWork[workName] = now
	}
	return Completed, nil
}