in pkg/hds/server/server.go [94:197]
func (s *server) process(stream Stream, reqOrRespCh chan *envoy_service_health.HealthCheckRequestOrEndpointHealthResponse) error {
streamID := atomic.AddInt64(&s.streamCount, 1)
lastVersion := ""
var watchCancellation func()
defer func() {
if watchCancellation != nil {
watchCancellation()
}
if s.callbacks != nil {
s.callbacks.OnStreamClosed(streamID)
}
}()
send := func(resp envoy_cache.Response) error {
if resp == nil {
return errors.New("missing response")
}
out, err := resp.GetDiscoveryResponse()
if err != nil {
return err
}
if len(out.Resources) == 0 {
return nil
}
hcs := &envoy_service_health.HealthCheckSpecifier{}
if err := util_proto.UnmarshalAnyTo(out.Resources[0], hcs); err != nil {
return err
}
lastVersion, err = resp.GetVersion()
if err != nil {
return err
}
return stream.Send(hcs)
}
if s.callbacks != nil {
if err := s.callbacks.OnStreamOpen(stream.Context(), streamID); err != nil {
return err
}
}
responseChan := make(chan envoy_cache.Response, 1)
node := &envoy_core.Node{}
for {
select {
case <-s.ctx.Done():
return nil
case resp, more := <-responseChan:
if !more {
return status.Error(codes.Unavailable, "healthChecks watch failed")
}
if err := send(resp); err != nil {
return err
}
if watchCancellation != nil {
watchCancellation()
}
watchCancellation = s.cache.CreateWatch(&envoy_cache.Request{
Node: node,
TypeUrl: hds_cache.HealthCheckSpecifierType,
ResourceNames: []string{"hcs"},
VersionInfo: lastVersion,
}, envoy_stream.NewStreamState(false, nil), responseChan)
case reqOrResp, more := <-reqOrRespCh:
if !more {
return nil
}
if reqOrResp == nil {
return status.Errorf(codes.Unavailable, "empty request")
}
if req := reqOrResp.GetHealthCheckRequest(); req != nil {
if req.Node != nil {
node = req.Node
} else {
req.Node = node
}
if s.callbacks != nil {
if err := s.callbacks.OnHealthCheckRequest(streamID, req); err != nil {
return err
}
}
}
if resp := reqOrResp.GetEndpointHealthResponse(); resp != nil {
if s.callbacks != nil {
if err := s.callbacks.OnEndpointHealthResponse(streamID, resp); err != nil {
return err
}
}
}
if watchCancellation != nil {
watchCancellation()
}
watchCancellation = s.cache.CreateWatch(&envoy_cache.Request{
Node: node,
TypeUrl: hds_cache.HealthCheckSpecifierType,
ResourceNames: []string{"hcs"},
VersionInfo: lastVersion,
}, envoy_stream.NewStreamState(false, nil), responseChan)
}
}
}