func()

in pkg/config/xds/apiclient/grpc_envoy.go [282:335]


func (g *AggGrpcApiClient) runDelta(req *discoverypb.DiscoveryRequest, output discoveryResponseHandler) error {
	var delta discoverypb.AggregatedDiscoveryService_StreamAggregatedResourcesClient
	var cancel context.CancelFunc
	var xState xdsState
	//read resource list
	backoff := func() {
		xState = xdsState{}
		for {
			//back off
			var err error
			var ctx context.Context // context to sync exitCh
			ctx, cancel = context.WithCancel(context.TODO())
			delta, err = g.sendInitAggRequest(ctx, req, &xState)
			if err != nil {
				logger.Error("can not receive delta discovery request, will back off 1 sec later", err)
				select {
				case <-time.After(1 * time.Second):
				case <-g.exitCh:
					logger.Infof("get close single.")
					return
				}
				continue //backoff
			}
			return //success
		}
	}

	backoff()
	if delta == nil { // delta instance not created because exitCh
		return nil
	}
	go func() {
		//waiting exitCh close
		for range g.exitCh {
		}
		cancel()
	}()
	//get message
	go func() {
		for { // delta response backoff.
			for { //loop consume receive data form xds server(sendInitDeltaRequest)
				resp, err := delta.Recv()
				if err != nil {
					logger.Error("can not receive delta discovery request", err)
					break
				}
				g.handleDeltaResponse(resp, &xState, output)
			}
			backoff()
		}
	}()

	return nil
}