in pkg/k8s/custom_source.go [87:134]
func (cs *NotificationChannel) Start(
ctx context.Context,
handler handler.EventHandler,
queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
// Source should have been specified by the user.
if cs.Source == nil {
return fmt.Errorf("must specify CustomChannle.Source")
}
// stop should have been injected before Start was called
if cs.stop == nil {
return fmt.Errorf("must call InjectStop on Channel before calling Start")
}
// use default value if DestBufferSize not specified
if cs.destBufferSize == 0 {
cs.destBufferSize = defaultBufferSize
}
cs.once.Do(func() {
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
go cs.syncLoop()
})
dst := make(chan GenericEvent, cs.destBufferSize)
go func() {
for evt := range dst {
switch evt.EventType {
case CREATE:
handler.Create(event.CreateEvent{Object: evt.Object}, queue)
case DELETE:
handler.Delete(event.DeleteEvent{Object: evt.OldObject}, queue)
case UPDATE:
handler.Update(event.UpdateEvent{ObjectOld: evt.OldObject, ObjectNew: evt.Object}, queue)
default:
_ = fmt.Errorf("Invalid Type %T", evt.EventType)
}
}
}()
cs.destLock.Lock()
defer cs.destLock.Unlock()
cs.dest = append(cs.dest, dst)
return nil
}