func()

in pkg/k8s/custom_source.go [87:134]


func (cs *NotificationChannel) Start(
	ctx context.Context,
	handler handler.EventHandler,
	queue workqueue.RateLimitingInterface,
	prct ...predicate.Predicate) error {
	// Source should have been specified by the user.
	if cs.Source == nil {
		return fmt.Errorf("must specify CustomChannle.Source")
	}

	// stop should have been injected before Start was called
	if cs.stop == nil {
		return fmt.Errorf("must call InjectStop on Channel before calling Start")
	}

	// use default value if DestBufferSize not specified
	if cs.destBufferSize == 0 {
		cs.destBufferSize = defaultBufferSize
	}

	cs.once.Do(func() {
		// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
		go cs.syncLoop()
	})

	dst := make(chan GenericEvent, cs.destBufferSize)
	go func() {
		for evt := range dst {
			switch evt.EventType {
			case CREATE:
				handler.Create(event.CreateEvent{Object: evt.Object}, queue)
			case DELETE:
				handler.Delete(event.DeleteEvent{Object: evt.OldObject}, queue)
			case UPDATE:
				handler.Update(event.UpdateEvent{ObjectOld: evt.OldObject, ObjectNew: evt.Object}, queue)
			default:
				_ = fmt.Errorf("Invalid Type %T", evt.EventType)
			}
		}
	}()

	cs.destLock.Lock()
	defer cs.destLock.Unlock()

	cs.dest = append(cs.dest, dst)

	return nil
}