in lib/process/process.go [593:660]
func (p *Process) mergeProcessState(state, src *pb.Process) Progress {
var rm []string
now := ptypes.TimestampNow()
// ActiveWork: take params etc from src, but retain some processing state.
// Remove from ActiveWork if work item is not in src.
for k, destv := range state.ActiveWork {
if srcp, ok := src.ActiveWork[k]; ok {
srcp.Status = destv.Status
state.ActiveWork[k] = srcp
} else {
rm = append(rm, k)
}
}
for _, k := range rm {
delete(state.ActiveWork, k)
if _, ok := state.CleanupWork[k]; !ok {
state.CleanupWork[k] = now
}
}
// Copy over active work items from src that are not currently in processing state.
for k, srcv := range src.ActiveWork {
if _, ok := state.ActiveWork[k]; !ok {
state.ActiveWork[k] = srcv
}
}
// CleanupWork: add all from src.
for k, v := range src.CleanupWork {
state.CleanupWork[k] = v
if _, ok := state.DroppedWork[k]; ok {
delete(state.DroppedWork, k)
}
if _, ok := state.ActiveWork[k]; ok {
delete(state.CleanupWork, k)
}
}
// DroppedWork: will only have changed in some error states, add from src
// if not on other lists. Timestamp of when dropped is not critical.
for k, v := range src.DroppedWork {
_, active := state.ActiveWork[k]
_, clean := state.CleanupWork[k]
_, drop := state.DroppedWork[k]
if !active && !clean && !drop {
state.CleanupWork[k] = v
}
}
rm = []string{}
for k := range state.DroppedWork {
_, active := state.ActiveWork[k]
_, clean := state.CleanupWork[k]
if active || clean {
rm = append(rm, k)
}
}
for _, work := range rm {
delete(state.DroppedWork, work)
}
// Keep ProcessName, Instance, ProcessStatus, and AggregateStats.
// Take remaining items from src.
state.ScheduleFrequency = src.ScheduleFrequency
state.Settings = src.Settings
state.SettingsTime = now // reflect this merge
return Merged
}