in lib/lro/lro.go [144:227]
func (s *Service) ProcessActiveWork(ctx context.Context, state *pb.Process, workName string, work *pb.Process_Work, process *processlib.Process) error {
var tx storage.Tx
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// For moving active work to the inactive pile.
move := make(map[string]*pb.Process_Work)
var abort error
for abort == nil {
results, err := s.store.MultiReadTx(storage.LongRunningOperationDatatype, storage.DefaultRealm, Active, storage.MatchAllIDs, nil, 0, 25, &pb.Process_Work{}, tx)
if err != nil {
process.AddWorkError(err, workName, state)
return err
}
if len(results.Entries) == 0 {
break
}
for _, entry := range results.Entries {
item, ok := entry.Item.(*pb.Process_Work)
if !ok {
err := fmt.Errorf("cast to process work")
if process.AddWorkError(err, workName, state) != processlib.Continue {
abort = err
break
}
continue
}
markStarted(item)
strParams := item.GetParams().GetStringParams()
op := strParams["operation"]
// Do not return early from here so we keep moving last forward, even when errors occur.
var err error
action := processlib.Continue
st := item.GetStatus()
switch op {
// Add all supported operations to this switch.
case opRealmRemoval:
action, err = s.removeRealm(ctx, entry.ItemID, item, state, process)
case "":
st.State = pb.Process_Status_ABORTED
err = fmt.Errorf("missing operation")
default:
st.State = pb.Process_Status_ABORTED
err = fmt.Errorf("unknown operation %q", op)
}
if st.State == pb.Process_Status_ABORTED {
move[entry.ItemID] = item
}
if err != nil {
if process.AddWorkError(err, workName, state) != processlib.Continue {
markIncomplete(item)
abort = err
break
}
}
if action != processlib.Continue {
markIncomplete(item)
st.State = pb.Process_Status_INCOMPLETE
abort = err
break
}
markCompleted(item)
move[entry.ItemID] = item
process.Progress(state)
}
for id, item := range move {
if err := s.store.WriteTx(storage.LongRunningOperationDatatype, storage.DefaultRealm, Inactive, id, storage.LatestRev, item, nil, tx); err != nil {
process.AddWorkError(err, workName, state)
continue
}
if err := s.store.DeleteTx(storage.LongRunningOperationDatatype, storage.DefaultRealm, Active, id, storage.LatestRev, tx); err != nil {
process.AddWorkError(err, workName, state)
}
}
// Always update last to show we hold the master key and should be the one to wake up early.
s.last = time.Now()
if abort != nil {
break
}
}
return abort
}