in pkg/mds/pusher/pusher.go [138:230]
func (p *pusher) Start(stop <-chan struct{}) error {
log.Info("pusher start")
ctx, cancel := context.WithCancel(context.Background())
// receive ResourceChanged Events
for resourceType := range p.resourceTypes {
log.Info("start receive ResourceChanged Event", "ResourceType", resourceType)
go p.receiveResourceChangedEvents(stop, resourceType)
}
fullResyncTicker := p.newFullResyncTicker()
defer fullResyncTicker.Stop()
for {
select {
case <-stop:
log.Info("pusher stopped")
cancel()
return nil
case ce := <-p.eventsChannel:
log.Info("event received", "ResourceType", ce.resourceType)
resourceList, err := registry.Global().NewList(ce.resourceType)
if err != nil {
log.Error(err, "failed to get resourceList")
continue
}
err = p.resourceManager.List(ctx, resourceList)
if err != nil {
log.Error(err, "list resource failed", "ResourceType", ce.resourceType)
continue
}
if reflect.DeepEqual(p.resourceLastPushed[ce.resourceType], resourceList) {
log.Info("resource not changed, nothing to push")
continue
}
p.resourceRevisions[ce.resourceType]++
p.resourceLastPushed[ce.resourceType] = resourceList
log.Info("invoke callbacks", "ResourceType", ce.resourceType, "revision", p.resourceRevisions[ce.resourceType])
// for a ResourceChangedEvent, invoke all callbacks.
p.resourceChangedCallbacks.InvokeCallbacks(ce.resourceType, PushedItems{
resourceList: resourceList,
revision: p.resourceRevisions[ce.resourceType],
})
case req := <-p.requestChannel:
resourceType := req.resourceType
id := req.id
log.Info("received a push request", "ResourceType", resourceType, "id", id)
cb, ok := p.resourceChangedCallbacks.GetCallBack(resourceType, id)
if !ok {
log.Info("not found callback", "ResourceType", resourceType, "id", id)
continue
}
revision := p.resourceRevisions[resourceType]
lastedPushed := p.resourceLastPushed[resourceType]
if lastedPushed == nil {
log.Info("last pushed is nil", "ResourceType", resourceType, "id", id)
continue
}
resourceList := lastedPushed
if req.requestFilter != nil {
resourceList = req.requestFilter(req.request, lastedPushed)
}
cb.Invoke(PushedItems{
resourceList: resourceList,
revision: revision,
})
case <-fullResyncTicker.C:
log.Info("full resync ticker arrived, starting resync for all types", "ResourceTypes", p.resourceTypes)
for resourceType := range p.resourceTypes {
revision := p.resourceRevisions[resourceType]
lastedPushed := p.resourceLastPushed[resourceType]
if lastedPushed == nil {
continue
}
// for a ResourceChangedEvent, invoke all callbacks.
p.resourceChangedCallbacks.InvokeCallbacks(resourceType, PushedItems{
resourceList: lastedPushed,
revision: revision,
})
}
}
}
}