in pkg/controller/controller.go [227:267]
func (c *controller) processNext(ctx context.Context, queue workqueue.RateLimitingInterface,
activityName liveness.ActivityName, handle func(ctx context.Context, id types.Id) error) {
obj, shutdown := queue.Get()
if shutdown {
return
}
go func() {
defer queue.Done(obj)
key, ok := obj.(string)
if !ok {
queue.Forget(obj)
runtime.HandleError(fmt.Errorf("Expected string in queue but got %T", obj))
return
}
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(err)
return
}
err = handle(ctx, types.NewId(namespace, name))
if err == nil {
if activityName != liveness.Undefined {
c.healthCheck.UpdateLastActivity(activityName, time.Now())
}
queue.Forget(obj)
return
}
queue.AddRateLimited(obj)
runtime.HandleError(err)
}()
}