func()

in lib/lro/lro.go [144:227]


func (s *Service) ProcessActiveWork(ctx context.Context, state *pb.Process, workName string, work *pb.Process_Work, process *processlib.Process) error {
	var tx storage.Tx
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	// For moving active work to the inactive pile.
	move := make(map[string]*pb.Process_Work)

	var abort error
	for abort == nil {
		results, err := s.store.MultiReadTx(storage.LongRunningOperationDatatype, storage.DefaultRealm, Active, storage.MatchAllIDs, nil, 0, 25, &pb.Process_Work{}, tx)
		if err != nil {
			process.AddWorkError(err, workName, state)
			return err
		}
		if len(results.Entries) == 0 {
			break
		}
		for _, entry := range results.Entries {
			item, ok := entry.Item.(*pb.Process_Work)
			if !ok {
				err := fmt.Errorf("cast to process work")
				if process.AddWorkError(err, workName, state) != processlib.Continue {
					abort = err
					break
				}
				continue
			}
			markStarted(item)
			strParams := item.GetParams().GetStringParams()
			op := strParams["operation"]
			// Do not return early from here so we keep moving last forward, even when errors occur.
			var err error
			action := processlib.Continue
			st := item.GetStatus()
			switch op {
			// Add all supported operations to this switch.
			case opRealmRemoval:
				action, err = s.removeRealm(ctx, entry.ItemID, item, state, process)
			case "":
				st.State = pb.Process_Status_ABORTED
				err = fmt.Errorf("missing operation")
			default:
				st.State = pb.Process_Status_ABORTED
				err = fmt.Errorf("unknown operation %q", op)
			}
			if st.State == pb.Process_Status_ABORTED {
				move[entry.ItemID] = item
			}
			if err != nil {
				if process.AddWorkError(err, workName, state) != processlib.Continue {
					markIncomplete(item)
					abort = err
					break
				}
			}
			if action != processlib.Continue {
				markIncomplete(item)
				st.State = pb.Process_Status_INCOMPLETE
				abort = err
				break
			}
			markCompleted(item)
			move[entry.ItemID] = item
			process.Progress(state)
		}
		for id, item := range move {
			if err := s.store.WriteTx(storage.LongRunningOperationDatatype, storage.DefaultRealm, Inactive, id, storage.LatestRev, item, nil, tx); err != nil {
				process.AddWorkError(err, workName, state)
				continue
			}
			if err := s.store.DeleteTx(storage.LongRunningOperationDatatype, storage.DefaultRealm, Active, id, storage.LatestRev, tx); err != nil {
				process.AddWorkError(err, workName, state)
			}
		}
		// Always update last to show we hold the master key and should be the one to wake up early.
		s.last = time.Now()
		if abort != nil {
			break
		}
	}

	return abort
}