func()

in pkg/config/xds/apiclient/grpc.go [144:202]


func (g *GrpcExtensionApiClient) runDelta(output chan<- *DeltaResources) error {
	var delta extensionpb.ExtensionConfigDiscoveryService_DeltaExtensionConfigsClient
	var cancel context.CancelFunc
	var xState xdsState
	backoff := func() {
		for {
			//back off
			var err error
			var ctx context.Context // context to sync exitCh
			ctx, cancel = context.WithCancel(context.TODO())
			delta, err = g.sendInitDeltaRequest(ctx, &xState)
			if err != nil {
				logger.Error("can not receive delta discovery request, will back off 1 sec later", err)
				select {
				case <-time.After(1 * time.Second):
				case <-g.exitCh:
					logger.Infof("get close single.")
					return
				}

				continue //backoff
			}
			return //success
		}
	}

	backoff()
	if delta == nil { // delta instance not created because exitCh
		return nil
	}
	go func() {
		//waiting exitCh close
		for range g.exitCh {
		}
		cancel()
	}()
	//get message
	go func() {
		for { // delta response backoff.
			for { //loop consume recv data form xds server(sendInitDeltaRequest)
				resp, err := delta.Recv()
				if err != nil { //todo backoff retry
					logger.Error("can not receive delta discovery request", err)
					break
				}
				g.handleDeltaResponse(resp, &xState, output)

				err = g.subscribeOnGoingChang(delta, &xState)
				if err != nil {
					logger.Error("can not recv delta discovery request", err)
					break
				}
			}
			backoff()
		}
	}()

	return nil
}