func()

in pkg/mds/pusher/pusher.go [138:230]


func (p *pusher) Start(stop <-chan struct{}) error {
	log.Info("pusher start")

	ctx, cancel := context.WithCancel(context.Background())

	// receive ResourceChanged Events
	for resourceType := range p.resourceTypes {
		log.Info("start receive ResourceChanged Event", "ResourceType", resourceType)
		go p.receiveResourceChangedEvents(stop, resourceType)
	}

	fullResyncTicker := p.newFullResyncTicker()
	defer fullResyncTicker.Stop()

	for {
		select {
		case <-stop:
			log.Info("pusher stopped")

			cancel()
			return nil
		case ce := <-p.eventsChannel:
			log.Info("event received", "ResourceType", ce.resourceType)
			resourceList, err := registry.Global().NewList(ce.resourceType)
			if err != nil {
				log.Error(err, "failed to get resourceList")
				continue
			}
			err = p.resourceManager.List(ctx, resourceList)
			if err != nil {
				log.Error(err, "list resource failed", "ResourceType", ce.resourceType)
				continue
			}
			if reflect.DeepEqual(p.resourceLastPushed[ce.resourceType], resourceList) {
				log.Info("resource not changed, nothing to push")
				continue
			}

			p.resourceRevisions[ce.resourceType]++
			p.resourceLastPushed[ce.resourceType] = resourceList

			log.Info("invoke callbacks", "ResourceType", ce.resourceType, "revision", p.resourceRevisions[ce.resourceType])
			// for a ResourceChangedEvent, invoke all callbacks.
			p.resourceChangedCallbacks.InvokeCallbacks(ce.resourceType, PushedItems{
				resourceList: resourceList,
				revision:     p.resourceRevisions[ce.resourceType],
			})
		case req := <-p.requestChannel:
			resourceType := req.resourceType
			id := req.id
			log.Info("received a push request", "ResourceType", resourceType, "id", id)

			cb, ok := p.resourceChangedCallbacks.GetCallBack(resourceType, id)
			if !ok {
				log.Info("not found callback", "ResourceType", resourceType, "id", id)
				continue
			}

			revision := p.resourceRevisions[resourceType]
			lastedPushed := p.resourceLastPushed[resourceType]
			if lastedPushed == nil {
				log.Info("last pushed is nil", "ResourceType", resourceType, "id", id)
				continue
			}

			resourceList := lastedPushed
			if req.requestFilter != nil {
				resourceList = req.requestFilter(req.request, lastedPushed)
			}

			cb.Invoke(PushedItems{
				resourceList: resourceList,
				revision:     revision,
			})
		case <-fullResyncTicker.C:
			log.Info("full resync ticker arrived, starting resync for all types", "ResourceTypes", p.resourceTypes)

			for resourceType := range p.resourceTypes {
				revision := p.resourceRevisions[resourceType]
				lastedPushed := p.resourceLastPushed[resourceType]
				if lastedPushed == nil {
					continue
				}

				// for a ResourceChangedEvent, invoke all callbacks.
				p.resourceChangedCallbacks.InvokeCallbacks(resourceType, PushedItems{
					resourceList: lastedPushed,
					revision:     revision,
				})
			}
		}
	}
}