in pkg/config/xds/apiclient/grpc.go [144:202]
func (g *GrpcExtensionApiClient) runDelta(output chan<- *DeltaResources) error {
var delta extensionpb.ExtensionConfigDiscoveryService_DeltaExtensionConfigsClient
var cancel context.CancelFunc
var xState xdsState
backoff := func() {
for {
//back off
var err error
var ctx context.Context // context to sync exitCh
ctx, cancel = context.WithCancel(context.TODO())
delta, err = g.sendInitDeltaRequest(ctx, &xState)
if err != nil {
logger.Error("can not receive delta discovery request, will back off 1 sec later", err)
select {
case <-time.After(1 * time.Second):
case <-g.exitCh:
logger.Infof("get close single.")
return
}
continue //backoff
}
return //success
}
}
backoff()
if delta == nil { // delta instance not created because exitCh
return nil
}
go func() {
//waiting exitCh close
for range g.exitCh {
}
cancel()
}()
//get message
go func() {
for { // delta response backoff.
for { //loop consume recv data form xds server(sendInitDeltaRequest)
resp, err := delta.Recv()
if err != nil { //todo backoff retry
logger.Error("can not receive delta discovery request", err)
break
}
g.handleDeltaResponse(resp, &xState, output)
err = g.subscribeOnGoingChang(delta, &xState)
if err != nil {
logger.Error("can not recv delta discovery request", err)
break
}
}
backoff()
}
}()
return nil
}