in pkg/dds/client/dds_client.go [84:136]
func (s *ddsSyncClient) Receive() error {
for _, typ := range s.resourceTypes {
s.log.V(1).Info("sending DeltaDiscoveryRequest", "type", typ)
if err := s.ddsStream.DeltaDiscoveryRequest(typ); err != nil {
return errors.Wrap(err, "discovering failed")
}
}
for {
received, err := s.ddsStream.Receive()
if err != nil {
if err == io.EOF {
return nil
}
return errors.Wrap(err, "failed to receive a discovery response")
}
s.log.V(1).Info("DeltaDiscoveryResponse received", "response", received)
if s.callbacks == nil {
s.log.Info("no callback set, sending ACK", "type", string(received.Type))
if err := s.ddsStream.ACK(received.Type); err != nil {
if err == io.EOF {
return nil
}
return errors.Wrap(err, "failed to ACK a discovery response")
}
continue
}
err = s.callbacks.OnResourcesReceived(received)
if !received.IsInitialRequest {
// Execute backoff only on subsequent request.
// When client first connects, the server sends empty DeltaDiscoveryResponse for every resource type.
time.Sleep(s.responseBackoff)
}
if err != nil {
s.log.Info("error during callback received, sending NACK", "err", err)
if err := s.ddsStream.NACK(received.Type, err); err != nil {
if err == io.EOF {
return nil
}
return errors.Wrap(err, "failed to NACK a discovery response")
}
} else {
s.log.V(1).Info("sending ACK", "type", received.Type)
if err := s.ddsStream.ACK(received.Type); err != nil {
if err == io.EOF {
return nil
}
return errors.Wrap(err, "failed to ACK a discovery response")
}
}
}
}