protocol/triple/triple_protocol/protocol_grpc.go (738 lines of code) (raw):

// Copyright 2021-2023 Buf Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package triple_protocol import ( "bufio" "context" "errors" "fmt" "io" "math" "net/http" "net/textproto" "runtime" "strconv" "strings" "time" "unicode/utf8" ) import ( "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol/internal/proto/connectext/grpc/status/v1" ) // protocol specification headers const ( grpcHeaderCompression = "Grpc-Encoding" grpcHeaderAcceptCompression = "Grpc-Accept-Encoding" grpcHeaderTimeout = "Grpc-Timeout" grpcHeaderStatus = "Grpc-Status" grpcHeaderMessage = "Grpc-Message" grpcHeaderDetails = "Grpc-Status-Details-Bin" grpcFlagEnvelopeTrailer = 0b10000000 grpcTimeoutMaxHours = math.MaxInt64 / int64(time.Hour) // how many hours fit into a time.Duration? grpcMaxTimeoutChars = 8 // from gRPC protocol grpcContentTypeDefault = "application/grpc" grpcContentTypePrefix = grpcContentTypeDefault + "+" ) var ( grpcTimeoutUnits = []struct { size time.Duration char byte }{ {time.Nanosecond, 'n'}, {time.Microsecond, 'u'}, {time.Millisecond, 'm'}, {time.Second, 'S'}, {time.Minute, 'M'}, {time.Hour, 'H'}, } grpcTimeoutUnitLookup = make(map[byte]time.Duration) grpcAllowedMethods = map[string]struct{}{ http.MethodPost: {}, } errTrailersWithoutGRPCStatus = fmt.Errorf("gRPC protocol error: no %s trailer", grpcHeaderStatus) // defaultGrpcUserAgent follows // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#user-agents: // // While the protocol does not require a user-agent to function it is recommended // that clients provide a structured user-agent string that provides a basic // description of the calling library, version & platform to facilitate issue diagnosis // in heterogeneous environments. The following structure is recommended to library developers: // // User-Agent → "grpc-" Language ?("-" Variant) "/" Version ?( " (" *(AdditionalProperty ";") ")" ) defaultGrpcUserAgent = fmt.Sprintf("grpc-go-triple/%s (%s)", Version, runtime.Version()) ) func init() { for _, pair := range grpcTimeoutUnits { grpcTimeoutUnitLookup[pair.char] = pair.size } } type protocolGRPC struct{} // for server side // NewHandler implements protocol, so it must return an interface. func (g *protocolGRPC) NewHandler(params *protocolHandlerParams) protocolHandler { bare, prefix := grpcContentTypeDefault, grpcContentTypePrefix contentTypes := make(map[string]struct{}) for _, name := range params.Codecs.Names() { contentTypes[canonicalizeContentType(prefix+name)] = struct{}{} } // default codec if params.Codecs.Get(codecNameProto) != nil { contentTypes[bare] = struct{}{} } return &grpcHandler{ protocolHandlerParams: *params, accept: contentTypes, } } // for client side // NewClient implements protocol, so it must return an interface. func (g *protocolGRPC) NewClient(params *protocolClientParams) (protocolClient, error) { peer := newPeerFromURL(params.URL, ProtocolGRPC) return &grpcClient{ protocolClientParams: *params, peer: peer, }, nil } type grpcHandler struct { protocolHandlerParams accept map[string]struct{} } func (g *grpcHandler) Methods() map[string]struct{} { return grpcAllowedMethods } func (g *grpcHandler) ContentTypes() map[string]struct{} { return g.accept } func (*grpcHandler) SetTimeout(request *http.Request) (context.Context, context.CancelFunc, error) { timeout, err := grpcParseTimeout(getHeaderCanonical(request.Header, grpcHeaderTimeout)) // not set or great than the threshold if err != nil && !errors.Is(err, errNoTimeout) { // Errors here indicate that the client sent an invalid timeout header, so // the error text is safe to send back. return nil, nil, NewError(CodeInvalidArgument, err) } else if err != nil { // err wraps errNoTimeout, nothing to do. return request.Context(), nil, nil //nolint:nilerr } ctx, cancel := context.WithTimeout(request.Context(), timeout) return ctx, cancel, nil } func (g *grpcHandler) CanHandlePayload(request *http.Request, contentType string) bool { _, ok := g.accept[contentType] return ok } // NewConn is the key function which is responsible for marshal and unmarshal func (g *grpcHandler) NewConn( responseWriter http.ResponseWriter, request *http.Request, ) (handlerConnCloser, bool) { // We need to parse metadata before entering the interceptor stack; we'll // send the error to the client later on. requestCompression, responseCompression, failed := negotiateCompression( g.CompressionPools, getHeaderCanonical(request.Header, grpcHeaderCompression), getHeaderCanonical(request.Header, grpcHeaderAcceptCompression), ) if failed == nil { failed = checkServerStreamsCanFlush(g.Spec, responseWriter) } // keep ready for headers of response // Write any remaining headers here: // (1) any writes to the stream will implicitly send the headers, so we // should get all of gRPC's required response headers ready. // (2) interceptors should be able to see these headers. // // Since we know that these header keys are already in canonical form, we can // skip the normalization in Header.Set. header := responseWriter.Header() header[headerContentType] = []string{getHeaderCanonical(request.Header, headerContentType)} header[grpcHeaderAcceptCompression] = []string{g.CompressionPools.CommaSeparatedNames()} if responseCompression != compressionIdentity { header[grpcHeaderCompression] = []string{responseCompression} } // content-type -> codecName -> codec codecName := grpcCodecFromContentType(getHeaderCanonical(request.Header, headerContentType)) codec := g.Codecs.Get(codecName) // handler.go guarantees this is not nil backupCodec := g.Codecs.Get(g.ExpectedCodecName) protocolName := ProtocolGRPC conn := wrapHandlerConnWithCodedErrors(&grpcHandlerConn{ spec: g.Spec, peer: Peer{ Addr: request.RemoteAddr, Protocol: protocolName, }, bufferPool: g.BufferPool, protobuf: g.Codecs.Protobuf(), // for errors marshaler: grpcMarshaler{ envelopeWriter: envelopeWriter{ writer: responseWriter, compressionPool: g.CompressionPools.Get(responseCompression), codec: codec, backupCodec: backupCodec, compressMinBytes: g.CompressMinBytes, bufferPool: g.BufferPool, sendMaxBytes: g.SendMaxBytes, }, }, responseWriter: responseWriter, responseHeader: make(http.Header), responseTrailer: make(http.Header), request: request, unmarshaler: grpcUnmarshaler{ envelopeReader: envelopeReader{ reader: request.Body, codec: codec, backupCodec: backupCodec, compressionPool: g.CompressionPools.Get(requestCompression), bufferPool: g.BufferPool, readMaxBytes: g.ReadMaxBytes, }, }, }) if failed != nil { // Negotiation failed, so we can't establish a stream. _ = conn.Close(failed) return nil, false } return conn, true } type grpcClient struct { protocolClientParams peer Peer } func (g *grpcClient) Peer() Peer { return g.peer } func (g *grpcClient) WriteRequestHeader(_ StreamType, header http.Header) { // We know these header keys are in canonical form, so we can bypass all the // checks in Header.Set. if getHeaderCanonical(header, headerUserAgent) == "" { header[headerUserAgent] = []string{defaultGrpcUserAgent} } header[headerContentType] = []string{grpcContentTypeFromCodecName(g.Codec.Name())} // gRPC handles compression on a per-message basis, so we don't want to // compress the whole stream. By default, http.Client will ask the server // to gzip the stream if we don't set Accept-Encoding. header["Accept-Encoding"] = []string{compressionIdentity} if g.CompressionName != "" && g.CompressionName != compressionIdentity { header[grpcHeaderCompression] = []string{g.CompressionName} } if acceptCompression := g.CompressionPools.CommaSeparatedNames(); acceptCompression != "" { header[grpcHeaderAcceptCompression] = []string{acceptCompression} } // The gRPC-HTTP2 specification requires this - it flushes out proxies that // don't support HTTP trailers. header["Te"] = []string{"trailers"} } func (g *grpcClient) NewConn( ctx context.Context, spec Spec, header http.Header, ) StreamingClientConn { if deadline, ok := ctx.Deadline(); ok { if encodedDeadline, err := grpcEncodeTimeout(time.Until(deadline)); err == nil { // Tests verify that the error in encodeTimeout is unreachable, so we // don't need to handle the error case. header[grpcHeaderTimeout] = []string{encodedDeadline} } } duplexCall := newDuplexHTTPCall( ctx, g.HTTPClient, g.URL, spec, header, ) conn := &grpcClientConn{ spec: spec, peer: g.Peer(), duplexCall: duplexCall, compressionPools: g.CompressionPools, bufferPool: g.BufferPool, protobuf: g.Protobuf, marshaler: grpcMarshaler{ envelopeWriter: envelopeWriter{ writer: duplexCall, compressionPool: g.CompressionPools.Get(g.CompressionName), codec: g.Codec, compressMinBytes: g.CompressMinBytes, bufferPool: g.BufferPool, sendMaxBytes: g.SendMaxBytes, }, }, unmarshaler: grpcUnmarshaler{ envelopeReader: envelopeReader{ reader: duplexCall, codec: g.Codec, bufferPool: g.BufferPool, readMaxBytes: g.ReadMaxBytes, }, }, responseHeader: make(http.Header), responseTrailer: make(http.Header), } duplexCall.SetValidateResponse(conn.validateResponse) conn.readTrailers = func(_ *grpcUnmarshaler, call *duplexHTTPCall) http.Header { // To access HTTP trailers, we need to read the body to EOF. _ = discard(call) return call.ResponseTrailer() } return wrapClientConnWithCodedErrors(conn) } // grpcClientConn works for both gRPC and gRPC-Web. type grpcClientConn struct { spec Spec peer Peer duplexCall *duplexHTTPCall compressionPools readOnlyCompressionPools bufferPool *bufferPool protobuf Codec // for errors marshaler grpcMarshaler unmarshaler grpcUnmarshaler responseHeader http.Header responseTrailer http.Header readTrailers func(*grpcUnmarshaler, *duplexHTTPCall) http.Header } func (cc *grpcClientConn) Spec() Spec { return cc.spec } func (cc *grpcClientConn) Peer() Peer { return cc.peer } func (cc *grpcClientConn) Send(msg any) error { if err := cc.marshaler.Marshal(msg); err != nil { return err } return nil // must be a literal nil: nil *Error is a non-nil error } func (cc *grpcClientConn) RequestHeader() http.Header { return cc.duplexCall.Header() } func (cc *grpcClientConn) CloseRequest() error { return cc.duplexCall.CloseWrite() } func (cc *grpcClientConn) Receive(msg any) error { cc.duplexCall.BlockUntilResponseReady() err := cc.unmarshaler.Unmarshal(msg) if err == nil { return nil } if getHeaderCanonical(cc.responseHeader, grpcHeaderStatus) != "" { // We got what gRPC calls a trailers-only response, which puts the trailing // metadata (including errors) into HTTP headers. validateResponse has // already extracted the error. return err } // See if the server sent an explicit error in the HTTP or gRPC-Web trailers. mergeHeaders( cc.responseTrailer, cc.readTrailers(&cc.unmarshaler, cc.duplexCall), ) serverErr := grpcErrorFromTrailer(cc.bufferPool, cc.protobuf, cc.responseTrailer) if serverErr != nil && (errors.Is(err, io.EOF) || !errors.Is(serverErr, errTrailersWithoutGRPCStatus)) { // We've either: // - Cleanly read until the end of the response body and *not* received // gRPC status trailers, which is a protocol error, or // - Received an explicit error from the server. // // This is expected from a protocol perspective, but receiving trailers // means that we're _not_ getting a message. For users to realize that // the stream has ended, Receive must return an error. serverErr.meta = cc.responseHeader.Clone() mergeHeaders(serverErr.meta, cc.responseTrailer) cc.duplexCall.SetError(serverErr) return serverErr } // This was probably an error converting the bytes to a message or an error // reading from the network. We're going to return it to the // user, but we also want to setResponseError so Send errors out. cc.duplexCall.SetError(err) return err } func (cc *grpcClientConn) ResponseHeader() http.Header { cc.duplexCall.BlockUntilResponseReady() return cc.responseHeader } func (cc *grpcClientConn) ResponseTrailer() http.Header { cc.duplexCall.BlockUntilResponseReady() return cc.responseTrailer } func (cc *grpcClientConn) CloseResponse() error { err := cc.duplexCall.CloseRead() if err != nil { return err } if cc.duplexCall.response != nil && cc.duplexCall.response.Trailer != nil { cc.responseTrailer = cc.duplexCall.response.Trailer.Clone() } return nil } func (cc *grpcClientConn) validateResponse(response *http.Response) *Error { if err := grpcValidateResponse( response, cc.responseHeader, cc.responseTrailer, cc.compressionPools, cc.bufferPool, cc.protobuf, ); err != nil { return err } compression := getHeaderCanonical(response.Header, grpcHeaderCompression) cc.unmarshaler.envelopeReader.compressionPool = cc.compressionPools.Get(compression) return nil } // connection based on compression, codec type grpcHandlerConn struct { spec Spec peer Peer web bool bufferPool *bufferPool protobuf Codec // for errors marshaler grpcMarshaler responseWriter http.ResponseWriter responseHeader http.Header responseTrailer http.Header wroteToBody bool request *http.Request unmarshaler grpcUnmarshaler } func (hc *grpcHandlerConn) Spec() Spec { return hc.spec } func (hc *grpcHandlerConn) Peer() Peer { return hc.peer } // Receive delegated receive and unmarshal processes to unmarshaler func (hc *grpcHandlerConn) Receive(msg any) error { if err := hc.unmarshaler.Unmarshal(msg); err != nil { return err // already coded } return nil // must be a literal nil: nil *Error is a non-nil error } func (hc *grpcHandlerConn) RequestHeader() http.Header { return hc.request.Header } func (hc *grpcHandlerConn) ExportableHeader() http.Header { // todo(DMwangnima): check out whether res should be cached res := make(http.Header) hdr := hc.request.Header for key, vals := range hdr { key = strings.ToLower(key) if isReservedHeader(key) && !isWhitelistedHeader(key) { continue } cloneVals := make([]string, len(vals)) for i, val := range vals { cloneVals[i] = val } res[key] = cloneVals } return res } // isReservedHeader checks whether hdr belongs to HTTP2 headers // reserved by gRPC protocol. Any other headers are classified as the // user-specified metadata. func isReservedHeader(hdr string) bool { if hdr != "" && hdr[0] == ':' { return true } switch hdr { case "content-type", "user-agent", "grpc-message-type", "grpc-encoding", "grpc-message", "grpc-status", "grpc-timeout", "grpc-status-details-bin", // Intentionally exclude grpc-previous-rpc-attempts and // grpc-retry-pushback-ms, which are "reserved", but their API // intentionally works via metadata. "te": return true default: return false } } // isWhitelistedHeader checks whether hdr should be propagated into metadata // visible to users, even though it is classified as "reserved", above. func isWhitelistedHeader(hdr string) bool { switch hdr { case ":authority", "user-agent": return true default: return false } } func (hc *grpcHandlerConn) Send(msg any) error { defer flushResponseWriter(hc.responseWriter) if !hc.wroteToBody { mergeHeaders(hc.responseWriter.Header(), hc.responseHeader) hc.wroteToBody = true } if err := hc.marshaler.Marshal(msg); err != nil { return err } return nil // must be a literal nil: nil *Error is a non-nil error } func (hc *grpcHandlerConn) ResponseHeader() http.Header { return hc.responseHeader } func (hc *grpcHandlerConn) ResponseTrailer() http.Header { return hc.responseTrailer } func (hc *grpcHandlerConn) Close(err error) (retErr error) { defer func() { // We don't want to copy unread portions of the body to /dev/null here: if // the client hasn't closed the request body, we'll block until the server // timeout kicks in. This could happen because the client is malicious, but // a well-intentioned client may just not expect the server to be returning // an error for a streaming RPC. Better to accept that we can't always reuse // TCP connections. closeErr := hc.request.Body.Close() if retErr == nil { retErr = closeErr } }() defer flushResponseWriter(hc.responseWriter) // If we haven't written the headers yet, do so. if !hc.wroteToBody { mergeHeaders(hc.responseWriter.Header(), hc.responseHeader) } // gRPC always sends the error's code, message, details, and metadata as // trailing metadata. The Triple protocol doesn't do this, so we don't want // to mutate the trailers map that the user sees. mergedTrailers := make( http.Header, len(hc.responseTrailer)+2, // always make space for status & message ) mergeHeaders(mergedTrailers, hc.responseTrailer) grpcErrorToTrailer(hc.bufferPool, mergedTrailers, hc.protobuf, err) if hc.web && !hc.wroteToBody { // We're using gRPC-Web and we haven't yet written to the body. Since we're // not sending any response messages, the gRPC specification calls this a // "trailers-only" response. Under those circumstances, the gRPC-Web spec // says that implementations _may_ send trailing metadata as HTTP headers // instead. Envoy is the canonical implementation of the gRPC-Web protocol, // so we emulate Envoy's behavior and put the trailing metadata in the HTTP // headers. mergeHeaders(hc.responseWriter.Header(), mergedTrailers) return nil } if hc.web { // We're using gRPC-Web and we've already sent the headers, so we write // trailing metadata to the HTTP body. if err := hc.marshaler.MarshalWebTrailers(mergedTrailers); err != nil { return err } return nil // must be a literal nil: nil *Error is a non-nil error } // We're using standard gRPC. Even if we haven't written to the body and // we're sending a "trailers-only" response, we must send trailing metadata // as HTTP trailers. (If we had frame-level control of the HTTP/2 layer, we // could send trailers-only responses as a single HEADER frame and no DATA // frames, but net/http doesn't expose APIs that low-level.) if !hc.wroteToBody { // This block works around a bug in x/net/http2. Until Go 1.20, trailers // written using http.TrailerPrefix were only sent if either (1) there's // data in the body, or (2) the innermost http.ResponseWriter is flushed. // To ensure that we always send a valid gRPC response, even if the user // has wrapped the response writer in net/http middleware that doesn't // implement http.Flusher, we must pre-declare our HTTP trailers. We can // remove this when Go 1.21 ships and we drop support for Go 1.19. for key := range mergedTrailers { addHeaderCanonical(hc.responseWriter.Header(), headerTrailer, key) } hc.responseWriter.WriteHeader(http.StatusOK) for key, values := range mergedTrailers { for _, value := range values { // These are potentially user-supplied, so we can't assume they're in // canonical form. Don't use addHeaderCanonical. hc.responseWriter.Header().Add(key, value) } } return nil } // In net/http's ResponseWriter API, we send HTTP trailers by writing to the // headers map with a special prefix. This prefixing is an implementation // detail, so we should hide it and _not_ mutate the user-visible headers. // // Note that this is _very_ finicky and difficult to test with net/http, // since correctness depends on low-level framing details. Breaking this // logic breaks Envoy's gRPC-Web translation. for key, values := range mergedTrailers { for _, value := range values { // These are potentially user-supplied, so we can't assume they're in // canonical form. Don't use addHeaderCanonical. hc.responseWriter.Header().Add(http.TrailerPrefix+key, value) } } return nil } type grpcMarshaler struct { envelopeWriter } func (m *grpcMarshaler) MarshalWebTrailers(trailer http.Header) *Error { raw := m.envelopeWriter.bufferPool.Get() defer m.envelopeWriter.bufferPool.Put(raw) for key, values := range trailer { // Per the Go specification, keys inserted during iteration may be produced // later in the iteration or may be skipped. For safety, avoid mutating the // map if the key is already lower-cased. lower := strings.ToLower(key) if key == lower { continue } delete(trailer, key) trailer[lower] = values } if err := trailer.Write(raw); err != nil { return errorf(CodeInternal, "format trailers: %w", err) } return m.Write(&envelope{ Data: raw, Flags: grpcFlagEnvelopeTrailer, }) } type grpcUnmarshaler struct { envelopeReader envelopeReader webTrailer http.Header } func (u *grpcUnmarshaler) Unmarshal(message any) *Error { // delegate read packet and unmarshal processes to envelopeReader err := u.envelopeReader.Unmarshal(message) if err == nil { return nil } if !errors.Is(err, errSpecialEnvelope) { return err } // for special envelope env := u.envelopeReader.last // for non-web grpc, last envelope needs to set grpcFlagEnvelopeTrailer if !env.IsSet(grpcFlagEnvelopeTrailer) { return errorf(CodeInternal, "protocol error: invalid envelope flags %d", env.Flags) } // Per the gRPC-Web specification, trailers should be encoded as an HTTP/1 // headers block _without_ the terminating newline. To make the headers // parseable by net/textproto, we need to add the newline. if err := env.Data.WriteByte('\n'); err != nil { return errorf(CodeInternal, "unmarshal web trailers: %w", err) } bufferedReader := bufio.NewReader(env.Data) mimeReader := textproto.NewReader(bufferedReader) mimeHeader, mimeErr := mimeReader.ReadMIMEHeader() if mimeErr != nil { return errorf( CodeInternal, "gRPC-Web protocol error: trailers invalid: %w", mimeErr, ) } u.webTrailer = http.Header(mimeHeader) return errSpecialEnvelope } func (u *grpcUnmarshaler) WebTrailer() http.Header { return u.webTrailer } func grpcValidateResponse( response *http.Response, header, trailer http.Header, availableCompressors readOnlyCompressionPools, bufferPool *bufferPool, protobuf Codec, ) *Error { if response.StatusCode != http.StatusOK { return errorf(grpcHTTPToCode(response.StatusCode), "HTTP status %v", response.Status) } if compression := getHeaderCanonical(response.Header, grpcHeaderCompression); compression != "" && compression != compressionIdentity && !availableCompressors.Contains(compression) { // Per https://github.com/grpc/grpc/blob/master/doc/compression.md, we // should return CodeInternal and specify acceptable compression(s) (in // addition to setting the Grpc-Accept-Encoding header). return errorf( CodeInternal, "unknown encoding %q: accepted encodings are %v", compression, availableCompressors.CommaSeparatedNames(), ) } // When there's no body, gRPC and gRPC-Web servers may send error information // in the HTTP headers. if err := grpcErrorFromTrailer( bufferPool, protobuf, response.Header, ); err != nil && !errors.Is(err, errTrailersWithoutGRPCStatus) { // Per the specification, only the HTTP status code and Content-Type should // be treated as headers. The rest should be treated as trailing metadata. if contentType := getHeaderCanonical(response.Header, headerContentType); contentType != "" { setHeaderCanonical(header, headerContentType, contentType) } mergeHeaders(trailer, response.Header) delHeaderCanonical(trailer, headerContentType) // Also set the error metadata err.meta = header.Clone() mergeHeaders(err.meta, trailer) return err } // The response is valid, so we should expose the headers. mergeHeaders(header, response.Header) return nil } func grpcHTTPToCode(httpCode int) Code { // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md // Note that this is not just the inverse of the gRPC-to-HTTP mapping. switch httpCode { case 400: return CodeInternal case 401: return CodeUnauthenticated case 403: return CodePermissionDenied case 404: return CodeUnimplemented case 429: return CodeUnavailable case 502, 503, 504: return CodeUnavailable default: return CodeUnknown } } // The gRPC wire protocol specifies that errors should be serialized using the // binary Protobuf format, even if the messages in the request/response stream // use a different codec. Consequently, this function needs a Protobuf codec to // unmarshal error information in the headers. func grpcErrorFromTrailer(bufferPool *bufferPool, protobuf Codec, trailer http.Header) *Error { codeHeader := getHeaderCanonical(trailer, grpcHeaderStatus) if codeHeader == "" { return NewError(CodeInternal, errTrailersWithoutGRPCStatus) } if codeHeader == "0" { return nil } code, err := strconv.ParseUint(codeHeader, 10 /* base */, 32 /* bitsize */) if err != nil { return errorf(CodeInternal, "gRPC protocol error: invalid error code %q", codeHeader) } message := grpcPercentDecode(bufferPool, getHeaderCanonical(trailer, grpcHeaderMessage)) retErr := NewWireError(Code(code), errors.New(message)) detailsBinaryEncoded := getHeaderCanonical(trailer, grpcHeaderDetails) if len(detailsBinaryEncoded) > 0 { detailsBinary, err := DecodeBinaryHeader(detailsBinaryEncoded) if err != nil { return errorf(CodeInternal, "server returned invalid grpc-status-details-bin trailer: %w", err) } var status statusv1.Status if err := protobuf.Unmarshal(detailsBinary, &status); err != nil { return errorf(CodeInternal, "server returned invalid protobuf for error details: %w", err) } for _, d := range status.Details { retErr.details = append(retErr.details, &ErrorDetail{pb: d}) } // Prefer the Protobuf-encoded data to the headers (grpc-go does this too). retErr.code = Code(status.Code) retErr.err = errors.New(status.Message) } return retErr } func grpcParseTimeout(timeout string) (time.Duration, error) { if timeout == "" { return 0, errNoTimeout } unit, ok := grpcTimeoutUnitLookup[timeout[len(timeout)-1]] if !ok { return 0, fmt.Errorf("gRPC protocol error: timeout %q has invalid unit", timeout) } num, err := strconv.ParseInt(timeout[:len(timeout)-1], 10 /* base */, 64 /* bitsize */) if err != nil || num < 0 { return 0, fmt.Errorf("gRPC protocol error: invalid timeout %q", timeout) } if num > 99999999 { // timeout must be ASCII string of at most 8 digits return 0, fmt.Errorf("gRPC protocol error: timeout %q is too long", timeout) } if unit == time.Hour && num > grpcTimeoutMaxHours { // Timeout is effectively unbounded, so ignore it. The grpc-go // implementation does the same thing. return 0, errNoTimeout } return time.Duration(num) * unit, nil } func grpcEncodeTimeout(timeout time.Duration) (string, error) { if timeout <= 0 { return "0n", nil } for _, pair := range grpcTimeoutUnits { digits := strconv.FormatInt(int64(timeout/pair.size), 10 /* base */) if len(digits) < grpcMaxTimeoutChars { return digits + string(pair.char), nil } } // The max time.Duration is smaller than the maximum expressible gRPC // timeout, so we can't reach this case. return "", errNoTimeout } func grpcCodecFromContentType(contentType string) string { if contentType == grpcContentTypeDefault { // implicitly protobuf return codecNameProto } return strings.TrimPrefix(contentType, grpcContentTypePrefix) } func grpcContentTypeFromCodecName(name string) string { return grpcContentTypePrefix + name } func grpcErrorToTrailer(bufferPool *bufferPool, trailer http.Header, protobuf Codec, err error) { if err == nil { setHeaderCanonical(trailer, grpcHeaderStatus, "0") // zero is the gRPC OK status setHeaderCanonical(trailer, grpcHeaderMessage, "") return } status := grpcStatusFromError(err) code := strconv.Itoa(int(status.Code)) bin, binErr := protobuf.Marshal(status) if binErr != nil { setHeaderCanonical( trailer, grpcHeaderStatus, strconv.FormatInt(int64(CodeInternal), 10 /* base */), ) setHeaderCanonical( trailer, grpcHeaderMessage, grpcPercentEncode( bufferPool, fmt.Sprintf("marshal protobuf status: %v", binErr), ), ) return } if tripleErr, ok := asError(err); ok { mergeHeaders(trailer, tripleErr.meta) } setHeaderCanonical(trailer, grpcHeaderStatus, code) setHeaderCanonical(trailer, grpcHeaderMessage, grpcPercentEncode(bufferPool, status.Message)) setHeaderCanonical(trailer, grpcHeaderDetails, EncodeBinaryHeader(bin)) } func grpcStatusFromError(err error) *statusv1.Status { status := &statusv1.Status{ Code: int32(CodeUnknown), Message: err.Error(), } if tripleErr, ok := asError(err); ok { status.Code = int32(tripleErr.Code()) status.Message = tripleErr.Message() status.Details = tripleErr.detailsAsAny() } return status } // grpcPercentEncode follows RFC 3986 Section 2.1 and the gRPC HTTP/2 spec. // It's a variant of URL-encoding with fewer reserved characters. It's intended // to take UTF-8 encoded text and escape non-ASCII bytes so that they're valid // HTTP/1 headers, while still maximizing readability of the data on the wire. // // The grpc-message trailer (used for human-readable error messages) should be // percent-encoded. // // References: // // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses // https://datatracker.ietf.org/doc/html/rfc3986#section-2.1 func grpcPercentEncode(bufferPool *bufferPool, msg string) string { for i := 0; i < len(msg); i++ { // Characters that need to be escaped are defined in gRPC's HTTP/2 spec. // They're different from the generic set defined in RFC 3986. if c := msg[i]; c < ' ' || c > '~' || c == '%' { return grpcPercentEncodeSlow(bufferPool, msg, i) } } return msg } // msg needs some percent-escaping. Bytes before offset don't require // percent-encoding, so they can be copied to the output as-is. func grpcPercentEncodeSlow(bufferPool *bufferPool, msg string, offset int) string { out := bufferPool.Get() defer bufferPool.Put(out) out.WriteString(msg[:offset]) for i := offset; i < len(msg); i++ { c := msg[i] if c < ' ' || c > '~' || c == '%' { out.WriteString(fmt.Sprintf("%%%02X", c)) continue } out.WriteByte(c) } return out.String() } func grpcPercentDecode(bufferPool *bufferPool, encoded string) string { for i := 0; i < len(encoded); i++ { if c := encoded[i]; c == '%' && i+2 < len(encoded) { return grpcPercentDecodeSlow(bufferPool, encoded, i) } } return encoded } // Similar to percentEncodeSlow: encoded is percent-encoded, and needs to be // decoded byte-by-byte starting at offset. func grpcPercentDecodeSlow(bufferPool *bufferPool, encoded string, offset int) string { out := bufferPool.Get() defer bufferPool.Put(out) out.WriteString(encoded[:offset]) for i := offset; i < len(encoded); i++ { c := encoded[i] if c != '%' || i+2 >= len(encoded) { out.WriteByte(c) continue } parsed, err := strconv.ParseUint(encoded[i+1:i+3], 16 /* hex */, 8 /* bitsize */) if err != nil { out.WriteRune(utf8.RuneError) } else { out.WriteByte(byte(parsed)) } i += 2 } return out.String() }