in pkg/common/http/manager.go [116:194]
func (hcm *HttpConnectionManager) writeResponse(c *pch.HttpContext) {
if !c.LocalReply() {
c.Writer.WriteHeader(c.GetStatusCode())
if c.TargetResp != nil {
switch res := c.TargetResp.(type) {
case *client.UnaryResponse:
_, err := c.Writer.Write(res.Data)
if err != nil {
logger.Errorf("Write response failed: %v", err)
}
case *client.StreamResponse:
// create ctx helps goroutine exit
ctx, cancel := context.WithCancel(c.Ctx)
defer cancel()
dataC := make(chan []byte)
errC := make(chan error, 1)
// goroutine read stream
go func() {
defer close(dataC)
defer close(errC)
buf := make([]byte, 1024) // 1KB buffer
for {
select {
case <-ctx.Done():
return
default:
n, err := res.Stream.Read(buf)
if n > 0 {
// copy data to prevent data cover
data := make([]byte, n)
copy(data, buf[:n])
select {
case dataC <- data:
case <-ctx.Done():
return
}
}
if err != nil {
if err != io.EOF {
errC <- fmt.Errorf("stream read error: %w", err)
} else {
errC <- io.EOF
}
return
}
}
}
}()
for {
select {
case <-ctx.Done():
_ = res.Stream.Close()
return
case data, ok := <-dataC:
if !ok {
return
}
if _, err := c.Writer.Write(data); err != nil {
cancel()
_ = res.Stream.Close()
return
}
case err := <-errC:
if err != nil && err != io.EOF {
logger.Errorf("Stream error: %v", err)
}
return
}
}
default:
logger.Errorf("Unknown response type: %T", c.TargetResp)
}
}
}
}