in pkg/authority/rule/connection/storage.go [155:213]
func (s *Storage) HandleRequest(c *Connection, req *ObserveRequest) {
if req.Type == "" {
logger.Sugar().Errorf("Empty request type from %v", c.Endpoint.ID)
return
}
if !TypeSupported(req.Type) {
logger.Sugar().Errorf("Unsupported request type %s from %s", req.Type, c.Endpoint.ID)
return
}
c.mutex.Lock()
defer c.mutex.Unlock()
if req.Nonce != "" {
cr := c.ClientRules[req.Type]
if cr == nil {
logger.Sugar().Errorf("Unexpected request type %s with nonce %s from %s", req.Type, req.Nonce, c.Endpoint.ID)
return
}
if cr.PushingStatus == Pushing {
if cr.LastPushNonce != req.Nonce {
logger.Sugar().Errorf("Unexpected request nonce %s from %s", req.Nonce, c.Endpoint.ID)
return
}
cr.ClientVersion = cr.LastPushedVersion
cr.PushingStatus = Pushed
logger.Sugar().Infof("Client %s pushed %s rule %s success", c.Endpoint.Ips, req.Type, cr.ClientVersion.Revision)
}
return
}
if _, ok := c.TypeListened[req.Type]; !ok {
logger.Sugar().Infof("Client %s listen %s rule", c.Endpoint.Ips, req.Type)
c.TypeListened[req.Type] = true
c.ClientRules[req.Type] = &ClientStatus{
PushingStatus: Pushed,
NonceInc: 0,
ClientVersion: &VersionedRule{
Revision: -1,
Type: req.Type,
},
LastPushedTime: 0,
LastPushedVersion: nil,
LastPushNonce: "",
}
latestRule := s.LatestRules[req.Type]
if latestRule != nil {
c.RawRuleQueue.Add(latestRule)
}
}
}