in pkg/adsc/adsc.go [482:601]
func (a *ADSC) handleRecv() {
for {
var err error
msg, err := a.stream.Recv()
if err != nil {
a.RecvWg.Done()
adscLog.Infof("Connection closed for node %v with err: %v", a.nodeID, err)
select {
case a.errChan <- err:
default:
}
// if 'reconnect' enabled - schedule a new Run
if a.cfg.BackoffPolicy != nil {
time.AfterFunc(a.cfg.BackoffPolicy.NextBackOff(), a.reconnect)
} else {
a.Close()
a.WaitClear()
a.Updates <- ""
a.XDSUpdates <- nil
close(a.errChan)
}
return
}
// Group-value-kind - used for high level api generator.
gvk := strings.SplitN(msg.TypeUrl, "/", 3)
adscLog.Info("Received ", a.url, " type ", msg.TypeUrl,
" cnt=", len(msg.Resources), " nonce=", msg.Nonce)
if a.cfg.ResponseHandler != nil {
a.cfg.ResponseHandler.HandleResponse(a, msg)
}
if msg.TypeUrl == collections.IstioMeshV1Alpha1MeshConfig.Resource().GroupVersionKind().String() &&
len(msg.Resources) > 0 {
rsc := msg.Resources[0]
m := &v1alpha1.MeshConfig{}
err = proto.Unmarshal(rsc.Value, m)
if err != nil {
adscLog.Warn("Failed to unmarshal mesh config", err)
}
a.Mesh = m
if a.LocalCacheDir != "" {
strResponse, err := protomarshal.ToJSONWithIndent(m, " ")
if err != nil {
continue
}
err = os.WriteFile(a.LocalCacheDir+"_mesh.json", []byte(strResponse), 0o644)
if err != nil {
continue
}
}
continue
}
// Process the resources.
a.VersionInfo[msg.TypeUrl] = msg.VersionInfo
switch msg.TypeUrl {
case v3.ListenerType:
listeners := make([]*listener.Listener, 0, len(msg.Resources))
for _, rsc := range msg.Resources {
valBytes := rsc.Value
ll := &listener.Listener{}
_ = proto.Unmarshal(valBytes, ll)
listeners = append(listeners, ll)
}
a.handleLDS(listeners)
case v3.ClusterType:
clusters := make([]*cluster.Cluster, 0, len(msg.Resources))
for _, rsc := range msg.Resources {
valBytes := rsc.Value
cl := &cluster.Cluster{}
_ = proto.Unmarshal(valBytes, cl)
clusters = append(clusters, cl)
}
a.handleCDS(clusters)
case v3.EndpointType:
eds := make([]*endpoint.ClusterLoadAssignment, 0, len(msg.Resources))
for _, rsc := range msg.Resources {
valBytes := rsc.Value
el := &endpoint.ClusterLoadAssignment{}
_ = proto.Unmarshal(valBytes, el)
eds = append(eds, el)
}
a.handleEDS(eds)
case v3.RouteType:
routes := make([]*route.RouteConfiguration, 0, len(msg.Resources))
for _, rsc := range msg.Resources {
valBytes := rsc.Value
rl := &route.RouteConfiguration{}
_ = proto.Unmarshal(valBytes, rl)
routes = append(routes, rl)
}
a.handleRDS(routes)
default:
a.handleMCP(gvk, msg.Resources)
}
// If we got no resource - still save to the store with empty name/namespace, to notify sync
// This scheme also allows us to chunk large responses !
// TODO: add hook to inject nacks
a.mutex.Lock()
if len(gvk) == 3 {
gt := config.GroupVersionKind{Group: gvk[0], Version: gvk[1], Kind: gvk[2]}
if _, exist := a.sync[gt.String()]; !exist {
a.sync[gt.String()] = time.Now()
}
}
a.Received[msg.TypeUrl] = msg
a.ack(msg)
a.mutex.Unlock()
select {
case a.XDSUpdates <- msg:
default:
}
}
}