mux.go (212 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package gmux
import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"strings"
"sync"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)
const (
http2FrameHeaderLength = 9
grpcContentType = "application/grpc"
)
// mux supports multiplexing plain-old HTTP/2 and gRPC traffic
// on a single listener.
type mux struct {
http2Server *http2.Server
grpcListener *chanListener
}
// ConfigureServer configures srv to identify gRPC connections and send them
// to the returned net.Listener, suitable for passing to grpc.Server.Serve,
// while all other HTTP requests will be handled by srv.
//
// ConfigureServer works with or without TLS enabled.
//
// When TLS is enabled, ConfigureServer relies on ALPN. ConfigureServer
// internally calls http2.ConfigureServer(srv, conf) to configure HTTP/2 support,
// and defines an alternative srv.TLSNextProto "h2" handler. When using TLS, the
// gRPC listener returns secure connections; the gRPC server must not also be
// configured to wrap the connection with TLS.
//
// When TLS is not enabled, ConfigureServer relies on h2c prior knowledge,
// wrapping srv.Handler. It is therefore necessary to set srv.Handler before
// calling ConfigureServer.
//
// The returned listener will be closed when srv.Shutdown is called. The
// returned listener's Addr() method does not correspond to the configured
// HTTP server's listener(s) in any way, and cannot be relied upon for forming
// a connection URL.
func ConfigureServer(srv *http.Server, conf *http2.Server) (grpcListener net.Listener, _ error) {
if err := http2.ConfigureServer(srv, conf); err != nil {
return nil, err
}
if conf == nil {
conf = new(http2.Server)
}
glis := newChanListener()
mux := &mux{http2Server: conf, grpcListener: glis}
srv.Handler = mux.withGRPCInsecure(srv.Handler, srv.ErrorLog)
srv.TLSNextProto[http2.NextProtoTLS] = func(srv *http.Server, conn *tls.Conn, h http.Handler) {
err := mux.handleH2(srv, conn, h)
if err != nil && srv.ErrorLog != nil {
srv.ErrorLog.Printf("handleH2 (%s) returned an error: %s", conn.RemoteAddr(), err)
}
}
srv.RegisterOnShutdown(func() { glis.Close() })
return glis, nil
}
// withGRPCInsecure wraps next such that h2c (HTTP/2 Cleartext) gRPC requests
// are hijacked and sent to the gRPC listener, and all other HTTP requests are
// handled by next.
//
// See https://httpwg.org/specs/rfc7540.html#rfc.section.3.4
func (m *mux) withGRPCInsecure(next http.Handler, errLog *log.Logger) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.TLS == nil && r.Method == "PRI" && len(r.Header) == 0 && r.URL.Path == "*" && r.Proto == "HTTP/2.0" {
hijacker, ok := w.(http.Hijacker)
if ok {
conn, rw, err := hijacker.Hijack()
if err != nil {
panic(fmt.Sprintf("Hijack failed: %v", err))
}
defer conn.Close()
// We just identify that we're dealing with a
// prior-knowledge connection, and pass it straight
// through to the gRPC server.
preface := "PRI * HTTP/2.0\r\n\r\n"
r := io.MultiReader(strings.NewReader(preface), rw, conn)
pc, closed := newProxyConn(conn, r, conn)
err = m.handleGRPC(nil, pc, closed, nil)
if err != nil && errLog != nil {
errLog.Printf("h2c handleGRPC (%s) returned an error: %s", conn.RemoteAddr(), err)
}
return
}
}
next.ServeHTTP(w, r)
})
}
func (m *mux) handleH2(srv *http.Server, conn net.Conn, handler http.Handler) error {
var clientReadBuf bytes.Buffer
connHandler, err := m.getConnHandler(conn, &clientReadBuf)
if err != nil {
return err
}
// Write frames from the server to a pipe so that we can de-duplicate the first
// SETTINGS ACK. The client's first SETTINGS is handled twice: once in getConnHandler,
// and once in the final connHandler. As getConnHandler will ACK the SETTINGS,
// we need to filter out the second one ade by the final connHandler.
rpipe, wpipe := io.Pipe()
go func() {
if err := copyFramesUntilSettingsAck(conn, rpipe); err != nil {
rpipe.CloseWithError(err)
return
}
_, err := io.Copy(conn, rpipe)
rpipe.CloseWithError(err)
}()
proxyConn, closed := newProxyConn(conn, io.MultiReader(&clientReadBuf, conn), wpipe)
err = connHandler(srv, proxyConn, closed, handler)
wpipe.CloseWithError(err)
return err
}
// copyFramesUntilSettingsAck copies http/2 frames from r to w
// up until (but excluding) the first settings ACK frame.
func copyFramesUntilSettingsAck(w io.Writer, r io.Reader) error {
var frameBuf bytes.Buffer
framer := http2.NewFramer(w, io.TeeReader(r, &frameBuf))
framer.SetReuseFrames()
var haveFirstSettingsACK bool
for !haveFirstSettingsACK {
f, err := framer.ReadFrame()
if err != nil {
return err
}
switch f := f.(type) {
case *http2.SettingsFrame:
if !haveFirstSettingsACK && f.IsAck() {
haveFirstSettingsACK = true
frameBuf.Truncate(frameBuf.Len() - int(f.Length) - http2FrameHeaderLength)
break
}
}
}
_, err := io.Copy(w, &frameBuf)
return err
}
// getConnHandler handles a new client connection, writing a SETTINGS
// request to the client, followed by reading the HTTP/2 client preface,
// and then finally looking for a Content-Type header to determine which
// connection handler to return.
//
// All data read from the client will be written to buf, which will be
// replayed to the backend HTTP/2 server.
func (m *mux) getConnHandler(conn net.Conn, buf *bytes.Buffer) (connHandlerFunc, error) {
rbuf := io.TeeReader(conn, buf)
framer := http2.NewFramer(conn, rbuf)
framer.SetReuseFrames()
// Client expects SETTINGS first, so send empty initial settings.
// The real server will send a new one with the real settings.
//
// When replaying frames to the real server, we'll need to suppress
// the ACK for this frame, which the server won't know about.
if err := framer.WriteSettings(); err != nil {
return nil, err
}
// Read client preface. We don't bother verifying it here, as it will
// be verified later by the real http2.Server.
var preface [len(http2.ClientPreface)]byte
if _, err := io.ReadFull(rbuf, preface[:]); err != nil {
return nil, err
}
contentType, err := m.getContentType(framer, buf)
if err != nil {
return nil, err
}
connHandler := m.handleHTTP
if contentType == grpcContentType {
connHandler = m.handleGRPC
}
return connHandler, nil
}
var decoderPool = sync.Pool{
New: func() interface{} {
out := &decoder{}
out.d = hpack.NewDecoder(4096, func(hf hpack.HeaderField) {
if hf.Name == "content-type" {
out.contentType = hf.Value
}
})
return out
},
}
type decoder struct {
d *hpack.Decoder
contentType string
}
func (m *mux) getContentType(framer *http2.Framer, framesBuf *bytes.Buffer) (contentType string, _ error) {
// Code based on https://github.com/soheilhy/cmux
//
// Copyright 2016 The CMux Authors. All rights reserved.
dec := decoderPool.Get().(*decoder)
// Read frames until we have the content-type header, or we know there isn't one.
var haveFirstSettings bool
var haveFirstSettingsACK bool
var haveEndHeaders bool
for (dec.contentType == "" && !haveEndHeaders) || !haveFirstSettings || !haveFirstSettingsACK {
f, err := framer.ReadFrame()
if err != nil {
return "", err
}
switch f := f.(type) {
case *http2.SettingsFrame:
switch {
case !haveFirstSettingsACK && f.IsAck():
haveFirstSettingsACK = true
// We accept the ACK, and omit it from the frames
// written to the real server.
framesBuf.Truncate(framesBuf.Len() - int(f.Length) - http2FrameHeaderLength)
case !haveFirstSettings && !f.IsAck():
haveFirstSettings = true
// We ACK the client's first SETTINGS to unblock it,
// and ignore the first ACK from the real server.
if err := framer.WriteSettingsAck(); err != nil {
return "", err
}
}
case *http2.ContinuationFrame:
if _, err := dec.d.Write(f.HeaderBlockFragment()); err != nil {
return "", err
}
haveEndHeaders = f.FrameHeader.Flags&http2.FlagHeadersEndHeaders != 0
case *http2.HeadersFrame:
if _, err := dec.d.Write(f.HeaderBlockFragment()); err != nil {
return "", err
}
haveEndHeaders = f.FrameHeader.Flags&http2.FlagHeadersEndHeaders != 0
}
}
contentType = dec.contentType
if dec.d.Close() == nil {
dec.contentType = ""
decoderPool.Put(dec)
}
return contentType, nil
}
type connHandlerFunc func(srv *http.Server, conn net.Conn, closed <-chan struct{}, handler http.Handler) error
func (m *mux) handleHTTP(srv *http.Server, conn net.Conn, closed <-chan struct{}, handler http.Handler) error {
// This code is adapted from x/net/http2 to not assume tls.Conn.
// The TLSNextProto interface predates contexts, so the net/http package passes
// down its per-connection base context via an exported but unadvertised method
// on the Handler. This is for internal net/http<=>http2 use only.
var ctx context.Context
type baseContexter interface {
BaseContext() context.Context
}
if bc, ok := handler.(baseContexter); ok {
ctx = bc.BaseContext()
}
m.http2Server.ServeConn(conn, &http2.ServeConnOpts{
Context: ctx,
Handler: handler,
BaseConfig: srv,
})
return nil
}
func (m *mux) handleGRPC(_ *http.Server, conn net.Conn, closed <-chan struct{}, _ http.Handler) error {
select {
case <-m.grpcListener.closed:
return errors.New("grpc listener closed")
case m.grpcListener.conns <- conn:
case <-closed:
// Connection closed before it could be handled.
return nil
}
<-closed
return nil
}