protocol/triple/triple_protocol/server.go (162 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package triple_protocol import ( "context" "crypto/tls" "net/http" "sync" ) import ( "github.com/dubbogo/grpc-go" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" ) import ( "github.com/dubbogo/gost/log/logger" ) type Server struct { mu sync.Mutex mux *http.ServeMux handlers map[string]*Handler httpSrv *http.Server } func (s *Server) RegisterUnaryHandler( procedure string, reqInitFunc func() any, unary func(context.Context, *Request) (*Response, error), options ...HandlerOption, ) error { hdl, ok := s.handlers[procedure] if !ok { hdl = NewUnaryHandler(procedure, reqInitFunc, unary, options...) s.handlers[procedure] = hdl s.mux.Handle(procedure, hdl) } else { config := newHandlerConfig(procedure, options) implementation := generateUnaryHandlerFunc(procedure, reqInitFunc, unary, config.Interceptor) hdl.processImplementation(getIdentifier(config.Group, config.Version), implementation) } return nil } func (s *Server) RegisterClientStreamHandler( procedure string, stream func(context.Context, *ClientStream) (*Response, error), options ...HandlerOption, ) error { hdl, ok := s.handlers[procedure] if !ok { hdl = NewClientStreamHandler(procedure, stream, options...) s.handlers[procedure] = hdl s.mux.Handle(procedure, hdl) } else { config := newHandlerConfig(procedure, options) implementation := generateClientStreamHandlerFunc(procedure, stream, config.Interceptor) hdl.processImplementation(getIdentifier(config.Group, config.Version), implementation) } return nil } func (s *Server) RegisterServerStreamHandler( procedure string, reqInitFunc func() any, stream func(context.Context, *Request, *ServerStream) error, options ...HandlerOption, ) error { hdl, ok := s.handlers[procedure] if !ok { hdl = NewServerStreamHandler(procedure, reqInitFunc, stream, options...) s.handlers[procedure] = hdl s.mux.Handle(procedure, hdl) } else { config := newHandlerConfig(procedure, options) implementation := generateServerStreamHandlerFunc(procedure, reqInitFunc, stream, config.Interceptor) hdl.processImplementation(getIdentifier(config.Group, config.Version), implementation) } return nil } func (s *Server) RegisterBidiStreamHandler( procedure string, stream func(context.Context, *BidiStream) error, options ...HandlerOption, ) error { hdl, ok := s.handlers[procedure] if !ok { hdl = NewBidiStreamHandler(procedure, stream, options...) s.handlers[procedure] = hdl s.mux.Handle(procedure, hdl) } else { config := newHandlerConfig(procedure, options) implementation := generateBidiStreamHandlerFunc(procedure, stream, config.Interceptor) hdl.processImplementation(getIdentifier(config.Group, config.Version), implementation) } return nil } func (s *Server) RegisterCompatUnaryHandler( procedure string, method string, srv any, unary MethodHandler, options ...HandlerOption, ) error { hdl, ok := s.handlers[procedure] if !ok { hdl = NewCompatUnaryHandler(procedure, method, srv, unary, options...) s.handlers[procedure] = hdl s.mux.Handle(procedure, hdl) } else { config := newHandlerConfig(procedure, options) implementation := generateCompatUnaryHandlerFunc(procedure, method, srv, unary, config.Interceptor) hdl.processImplementation(getIdentifier(config.Group, config.Version), implementation) } return nil } func (s *Server) RegisterCompatStreamHandler( procedure string, srv any, typ StreamType, streamFunc func(srv any, stream grpc.ServerStream) error, options ...HandlerOption, ) error { hdl, ok := s.handlers[procedure] if !ok { hdl = NewCompatStreamHandler(procedure, srv, typ, streamFunc, options...) s.handlers[procedure] = hdl s.mux.Handle(procedure, hdl) } else { config := newHandlerConfig(procedure, options) implementation := generateCompatStreamHandlerFunc(procedure, srv, streamFunc, config.Interceptor) hdl.processImplementation(getIdentifier(config.Group, config.Version), implementation) } return nil } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { handler, pattern := s.mux.Handler(r) if pattern == "" { logger.Warnf("404: didn't register this method - %s\n", r.URL.Path) } handler.ServeHTTP(w, r) } func (s *Server) Run() error { s.httpSrv.Handler = h2c.NewHandler(s, &http2.Server{}) var err error if s.httpSrv.TLSConfig != nil { // TODO: Maybe we should be able to find a better way to start TLS. err = s.httpSrv.ListenAndServeTLS("", "") } else { err = s.httpSrv.ListenAndServe() } return err } func (s *Server) SetTLSConfig(c *tls.Config) { s.httpSrv.TLSConfig = c } func (s *Server) Stop() error { return s.httpSrv.Close() } func (s *Server) GracefulStop(ctx context.Context) error { return s.httpSrv.Shutdown(ctx) } func NewServer(addr string) *Server { return &Server{ mux: http.NewServeMux(), handlers: make(map[string]*Handler), httpSrv: &http.Server{Addr: addr}, } }