in grpc-xds/control-plane-go/pkg/xds/snapshot_cache.go [101:126]
func (c *SnapshotCache) CreateWatch(request *cachev3.Request, state streamv3.StreamState, responses chan cachev3.Response) (cancel func()) {
if isListenerRequest(request) {
c.logger.Info("CreateWatch",
"typeUrl", request.TypeUrl,
"resourceNames", request.ResourceNames,
"node.cluster", request.Node.Cluster,
"node.user_agent_name", request.Node.UserAgentName,
"node.id", request.Node.Id)
nodeHash := c.hash.ID(request.GetNode())
addressesFromRequest, err := findServerListenerAddresses(request.ResourceNames)
if err != nil {
c.logger.Error(err, "Problem encountered when looking for server listener addresses in new Listener stream request", "nodeHash", nodeHash)
return func() {}
}
changes := c.grpcServerListenerCache.Add(nodeHash, addressesFromRequest)
existingSnapshot, err := c.delegate.GetSnapshot(nodeHash)
if err != nil || existingSnapshot == nil || changes {
apps := c.appsCache.GetAll()
if err := c.createNewSnapshot(nodeHash, apps); err != nil {
c.logger.Error(err, "Could not set new xDS resource snapshot", "nodeHash", nodeHash, "apps", apps)
return func() {}
}
}
}
return c.delegate.CreateWatch(request, state, responses)
}