in pkg/config/xds/apiclient/grpc_envoy.go [282:335]
func (g *AggGrpcApiClient) runDelta(req *discoverypb.DiscoveryRequest, output discoveryResponseHandler) error {
var delta discoverypb.AggregatedDiscoveryService_StreamAggregatedResourcesClient
var cancel context.CancelFunc
var xState xdsState
//read resource list
backoff := func() {
xState = xdsState{}
for {
//back off
var err error
var ctx context.Context // context to sync exitCh
ctx, cancel = context.WithCancel(context.TODO())
delta, err = g.sendInitAggRequest(ctx, req, &xState)
if err != nil {
logger.Error("can not receive delta discovery request, will back off 1 sec later", err)
select {
case <-time.After(1 * time.Second):
case <-g.exitCh:
logger.Infof("get close single.")
return
}
continue //backoff
}
return //success
}
}
backoff()
if delta == nil { // delta instance not created because exitCh
return nil
}
go func() {
//waiting exitCh close
for range g.exitCh {
}
cancel()
}()
//get message
go func() {
for { // delta response backoff.
for { //loop consume receive data form xds server(sendInitDeltaRequest)
resp, err := delta.Recv()
if err != nil {
logger.Error("can not receive delta discovery request", err)
break
}
g.handleDeltaResponse(resp, &xState, output)
}
backoff()
}
}()
return nil
}