func()

in pkg/hds/server/server.go [94:197]


func (s *server) process(stream Stream, reqOrRespCh chan *envoy_service_health.HealthCheckRequestOrEndpointHealthResponse) error {
	streamID := atomic.AddInt64(&s.streamCount, 1)
	lastVersion := ""

	var watchCancellation func()
	defer func() {
		if watchCancellation != nil {
			watchCancellation()
		}
		if s.callbacks != nil {
			s.callbacks.OnStreamClosed(streamID)
		}
	}()

	send := func(resp envoy_cache.Response) error {
		if resp == nil {
			return errors.New("missing response")
		}

		out, err := resp.GetDiscoveryResponse()
		if err != nil {
			return err
		}
		if len(out.Resources) == 0 {
			return nil
		}

		hcs := &envoy_service_health.HealthCheckSpecifier{}
		if err := util_proto.UnmarshalAnyTo(out.Resources[0], hcs); err != nil {
			return err
		}
		lastVersion, err = resp.GetVersion()
		if err != nil {
			return err
		}
		return stream.Send(hcs)
	}

	if s.callbacks != nil {
		if err := s.callbacks.OnStreamOpen(stream.Context(), streamID); err != nil {
			return err
		}
	}

	responseChan := make(chan envoy_cache.Response, 1)
	node := &envoy_core.Node{}
	for {
		select {
		case <-s.ctx.Done():
			return nil
		case resp, more := <-responseChan:
			if !more {
				return status.Error(codes.Unavailable, "healthChecks watch failed")
			}
			if err := send(resp); err != nil {
				return err
			}
			if watchCancellation != nil {
				watchCancellation()
			}
			watchCancellation = s.cache.CreateWatch(&envoy_cache.Request{
				Node:          node,
				TypeUrl:       hds_cache.HealthCheckSpecifierType,
				ResourceNames: []string{"hcs"},
				VersionInfo:   lastVersion,
			}, envoy_stream.NewStreamState(false, nil), responseChan)
		case reqOrResp, more := <-reqOrRespCh:
			if !more {
				return nil
			}
			if reqOrResp == nil {
				return status.Errorf(codes.Unavailable, "empty request")
			}
			if req := reqOrResp.GetHealthCheckRequest(); req != nil {
				if req.Node != nil {
					node = req.Node
				} else {
					req.Node = node
				}
				if s.callbacks != nil {
					if err := s.callbacks.OnHealthCheckRequest(streamID, req); err != nil {
						return err
					}
				}
			}
			if resp := reqOrResp.GetEndpointHealthResponse(); resp != nil {
				if s.callbacks != nil {
					if err := s.callbacks.OnEndpointHealthResponse(streamID, resp); err != nil {
						return err
					}
				}
			}
			if watchCancellation != nil {
				watchCancellation()
			}
			watchCancellation = s.cache.CreateWatch(&envoy_cache.Request{
				Node:          node,
				TypeUrl:       hds_cache.HealthCheckSpecifierType,
				ResourceNames: []string{"hcs"},
				VersionInfo:   lastVersion,
			}, envoy_stream.NewStreamState(false, nil), responseChan)
		}
	}
}