func()

in pkg/common/http/manager.go [116:194]


func (hcm *HttpConnectionManager) writeResponse(c *pch.HttpContext) {
	if !c.LocalReply() {
		c.Writer.WriteHeader(c.GetStatusCode())
		if c.TargetResp != nil {
			switch res := c.TargetResp.(type) {
			case *client.UnaryResponse:
				_, err := c.Writer.Write(res.Data)
				if err != nil {
					logger.Errorf("Write response failed: %v", err)
				}
			case *client.StreamResponse:
				// create ctx helps goroutine exit
				ctx, cancel := context.WithCancel(c.Ctx)
				defer cancel()

				dataC := make(chan []byte)
				errC := make(chan error, 1)

				// goroutine read stream
				go func() {
					defer close(dataC)
					defer close(errC)
					buf := make([]byte, 1024) // 1KB buffer
					for {
						select {
						case <-ctx.Done():
							return
						default:
							n, err := res.Stream.Read(buf)
							if n > 0 {
								// copy data to prevent data cover
								data := make([]byte, n)
								copy(data, buf[:n])
								select {
								case dataC <- data:
								case <-ctx.Done():
									return
								}
							}
							if err != nil {
								if err != io.EOF {
									errC <- fmt.Errorf("stream read error: %w", err)
								} else {
									errC <- io.EOF
								}
								return
							}
						}
					}
				}()

				for {
					select {
					case <-ctx.Done():
						_ = res.Stream.Close()
						return
					case data, ok := <-dataC:
						if !ok {
							return
						}
						if _, err := c.Writer.Write(data); err != nil {
							cancel()
							_ = res.Stream.Close()
							return
						}
					case err := <-errC:
						if err != nil && err != io.EOF {
							logger.Errorf("Stream error: %v", err)
						}
						return
					}
				}
			default:
				logger.Errorf("Unknown response type: %T", c.TargetResp)
			}

		}
	}
}