in protocol/triple/triple_protocol/protocol_grpc.go [158:234]
func (g *grpcHandler) NewConn(
responseWriter http.ResponseWriter,
request *http.Request,
) (handlerConnCloser, bool) {
// We need to parse metadata before entering the interceptor stack; we'll
// send the error to the client later on.
requestCompression, responseCompression, failed := negotiateCompression(
g.CompressionPools,
getHeaderCanonical(request.Header, grpcHeaderCompression),
getHeaderCanonical(request.Header, grpcHeaderAcceptCompression),
)
if failed == nil {
failed = checkServerStreamsCanFlush(g.Spec, responseWriter)
}
// keep ready for headers of response
// Write any remaining headers here:
// (1) any writes to the stream will implicitly send the headers, so we
// should get all of gRPC's required response headers ready.
// (2) interceptors should be able to see these headers.
//
// Since we know that these header keys are already in canonical form, we can
// skip the normalization in Header.Set.
header := responseWriter.Header()
header[headerContentType] = []string{getHeaderCanonical(request.Header, headerContentType)}
header[grpcHeaderAcceptCompression] = []string{g.CompressionPools.CommaSeparatedNames()}
if responseCompression != compressionIdentity {
header[grpcHeaderCompression] = []string{responseCompression}
}
// content-type -> codecName -> codec
codecName := grpcCodecFromContentType(getHeaderCanonical(request.Header, headerContentType))
codec := g.Codecs.Get(codecName) // handler.go guarantees this is not nil
backupCodec := g.Codecs.Get(g.ExpectedCodecName)
protocolName := ProtocolGRPC
conn := wrapHandlerConnWithCodedErrors(&grpcHandlerConn{
spec: g.Spec,
peer: Peer{
Addr: request.RemoteAddr,
Protocol: protocolName,
},
bufferPool: g.BufferPool,
protobuf: g.Codecs.Protobuf(), // for errors
marshaler: grpcMarshaler{
envelopeWriter: envelopeWriter{
writer: responseWriter,
compressionPool: g.CompressionPools.Get(responseCompression),
codec: codec,
backupCodec: backupCodec,
compressMinBytes: g.CompressMinBytes,
bufferPool: g.BufferPool,
sendMaxBytes: g.SendMaxBytes,
},
},
responseWriter: responseWriter,
responseHeader: make(http.Header),
responseTrailer: make(http.Header),
request: request,
unmarshaler: grpcUnmarshaler{
envelopeReader: envelopeReader{
reader: request.Body,
codec: codec,
backupCodec: backupCodec,
compressionPool: g.CompressionPools.Get(requestCompression),
bufferPool: g.BufferPool,
readMaxBytes: g.ReadMaxBytes,
},
},
})
if failed != nil {
// Negotiation failed, so we can't establish a stream.
_ = conn.Close(failed)
return nil, false
}
return conn, true
}