func()

in lib/process/process.go [593:660]


func (p *Process) mergeProcessState(state, src *pb.Process) Progress {
	var rm []string
	now := ptypes.TimestampNow()

	// ActiveWork: take params etc from src, but retain some processing state.
	// Remove from ActiveWork if work item is not in src.
	for k, destv := range state.ActiveWork {
		if srcp, ok := src.ActiveWork[k]; ok {
			srcp.Status = destv.Status
			state.ActiveWork[k] = srcp
		} else {
			rm = append(rm, k)
		}
	}
	for _, k := range rm {
		delete(state.ActiveWork, k)
		if _, ok := state.CleanupWork[k]; !ok {
			state.CleanupWork[k] = now
		}
	}
	// Copy over active work items from src that are not currently in processing state.
	for k, srcv := range src.ActiveWork {
		if _, ok := state.ActiveWork[k]; !ok {
			state.ActiveWork[k] = srcv
		}
	}

	// CleanupWork: add all from src.
	for k, v := range src.CleanupWork {
		state.CleanupWork[k] = v
		if _, ok := state.DroppedWork[k]; ok {
			delete(state.DroppedWork, k)
		}
		if _, ok := state.ActiveWork[k]; ok {
			delete(state.CleanupWork, k)
		}
	}

	// DroppedWork: will only have changed in some error states, add from src
	// if not on other lists. Timestamp of when dropped is not critical.
	for k, v := range src.DroppedWork {
		_, active := state.ActiveWork[k]
		_, clean := state.CleanupWork[k]
		_, drop := state.DroppedWork[k]
		if !active && !clean && !drop {
			state.CleanupWork[k] = v
		}
	}
	rm = []string{}
	for k := range state.DroppedWork {
		_, active := state.ActiveWork[k]
		_, clean := state.CleanupWork[k]
		if active || clean {
			rm = append(rm, k)
		}
	}
	for _, work := range rm {
		delete(state.DroppedWork, work)
	}

	// Keep ProcessName, Instance, ProcessStatus, and AggregateStats.
	// Take remaining items from src.
	state.ScheduleFrequency = src.ScheduleFrequency
	state.Settings = src.Settings
	state.SettingsTime = now // reflect this merge

	return Merged
}