router/core/http_server.go (110 lines of code) (raw):

package core import ( "context" "crypto/tls" "errors" "net/http" "sync" "time" "github.com/go-chi/chi/v5" "go.uber.org/zap" "github.com/wundergraph/cosmo/router/pkg/health" ) type server struct { mu sync.RWMutex httpServer *http.Server tlsConfig *TlsConfig logger *zap.Logger handler http.Handler healthcheck health.Checker baseURL string graphServer *graphServer } type httpServerOptions struct { addr string logger *zap.Logger tlsConfig *TlsConfig tlsServerConfig *tls.Config healthcheck health.Checker baseURL string maxHeaderBytes int livenessCheckPath string readinessCheckPath string healthCheckPath string } func newServer(opts *httpServerOptions) *server { httpServer := &http.Server{ Addr: opts.addr, ReadTimeout: 60 * time.Second, // Disable write timeout to keep the connection open until the client closes it // This is required for SSE (Server-Sent-Events) subscriptions to work correctly WriteTimeout: 0, ErrorLog: zap.NewStdLog(opts.logger), TLSConfig: opts.tlsServerConfig, MaxHeaderBytes: opts.maxHeaderBytes, } // Create default handler for liveness and readiness httpRouter := chi.NewMux() httpRouter.Get(opts.healthCheckPath, opts.healthcheck.Liveness()) httpRouter.Get(opts.livenessCheckPath, opts.healthcheck.Liveness()) httpRouter.Get(opts.readinessCheckPath, opts.healthcheck.Readiness()) n := &server{ httpServer: httpServer, tlsConfig: opts.tlsConfig, logger: opts.logger, mu: sync.RWMutex{}, healthcheck: opts.healthcheck, baseURL: opts.baseURL, handler: httpRouter, } httpServer.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Multiple requests can read the handler at the same time, but only one goroutine can write it. // When swapping the graph server there might be in-flight requests that are still being processed // but this is tolerable because we are waiting for them to finish before shutting down the old server. n.mu.RLock() handler := n.handler n.mu.RUnlock() handler.ServeHTTP(w, r) }) return n } func (s *server) HealthChecks() health.Checker { return s.healthcheck } func (s *server) HttpServer() *http.Server { return s.httpServer } // SwapGraphServer swaps the current graph server with a new one. It will shut down the old server gracefully. // Because we swap the handler immediately, we can guarantee that no new requests will be served by the old graph server. // However, it is possible that there are still requests in flight that are being processed by the old graph server. // We wait until all requests are processed or timeout before shutting down the old graph server forcefully. // Websocket connections are closed after shutdown through context cancellation. In the future, we might want to send // a complete message to the client and wait until in-flight messages are delivered before closing the connection. // NOT SAFE FOR CONCURRENT USE. func (s *server) SwapGraphServer(ctx context.Context, svr *graphServer) { needsShutdown := s.handler != nil && s.graphServer != nil // Swap the handler immediately, so we can shut down the old server in the same goroutine // and no other config changes can happen in the meantime. s.mu.Lock() s.handler = svr.mux s.mu.Unlock() // If the graph server is nil, we don't need to shutdown anything // This is the case when the router is starting for the first time if needsShutdown { if err := s.graphServer.Shutdown(ctx); err != nil { s.logger.Error("Failed to shutdown old graph", zap.Error(err)) } } // Swap the graph server s.mu.Lock() s.graphServer = svr s.mu.Unlock() } // listenAndServe starts the server and blocks until the server is shutdown. func (s *server) listenAndServe() error { if s.tlsConfig != nil && s.tlsConfig.Enabled { // Leave the cert and key empty to use the default ones if err := s.httpServer.ListenAndServeTLS("", ""); err != nil && !errors.Is(err, http.ErrServerClosed) { return err } } else { if err := s.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { return err } } return nil } func (s *server) Shutdown(ctx context.Context) error { var err error s.mu.Lock() defer s.mu.Unlock() if s.graphServer != nil { err = errors.Join(s.graphServer.Shutdown(ctx)) } if s.httpServer != nil { err = errors.Join(s.httpServer.Shutdown(ctx)) } s.graphServer = nil s.handler = nil return err }