pkg/server/server.go (92 lines of code) (raw):
package server
import (
"context"
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.amzn.com/eks/eks-pod-identity-agent/configuration"
"go.amzn.com/eks/eks-pod-identity-agent/internal/middleware/logger"
ratelimiter "go.amzn.com/eks/eks-pod-identity-agent/internal/middleware/rate_limiter"
"go.amzn.com/eks/eks-pod-identity-agent/pkg/handlers"
)
type (
// RegisterFunc function that is invoked by handlers to register their
// endpoints
RegisterFunc = func(pattern string, handlerFunc http.HandlerFunc)
// HandlerConfigurer must be implemented by handlers that want to
// register their handler in the server
HandlerConfigurer interface {
ConfigureHandler(register RegisterFunc)
}
// Server that will be initialized in ListenUntilContextCancelled
Server struct {
// Configurer is responsible for configuring the Server's mux, injecting
// its own handlers
configurer HandlerConfigurer
// server contains the HTTP server that will listen to requests
server *http.Server
mux *http.ServeMux
}
)
const (
defaultRequestTimeout = 30 * time.Second
maxTerminationWait = defaultRequestTimeout + 5*time.Second
)
func newBaseServer(addr string) *Server {
mux := http.NewServeMux()
return &Server{
mux: mux,
server: &http.Server{
Addr: addr,
Handler: mux,
ReadTimeout: defaultRequestTimeout,
WriteTimeout: defaultRequestTimeout,
},
}
}
func NewProbeServer(addr string, hosts []string, port uint16) *Server {
srv := newBaseServer(addr)
srv.configurer = handlers.NewProbeHandler(hosts, port)
return srv
}
func NewEksCredentialServer(addr string, opts handlers.EksCredentialHandlerOpts) *Server {
srv := newBaseServer(addr)
srv.configurer = handlers.NewEksCredentialHandler(opts)
return srv
}
func NewMetricsServer(addr string, hosts []string, port uint16) *Server {
srv := newBaseServer(addr)
srv.configurer = handlers.NewProbeHandler(hosts, port)
srv.mux.Handle("/metrics", promhttp.Handler())
return srv
}
func (p *Server) ListenUntilContextCancelled(ctx context.Context) {
log := logger.FromContext(ctx)
p.configureHandler()
// Run the server in a goroutine
go func() {
log.Info("Starting server...")
if err := p.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Unable to start server: %v", err)
}
log.Debug("Server has stopped listening")
}()
// Block until a signal is received
select {
case <-ctx.Done():
}
log.Info("Shutting down server...")
// Create a context with a timeout for the graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), maxTerminationWait)
defer cancel()
// Shutdown the server and wait for existing connections to be closed
if err := p.server.Shutdown(ctx); err != nil {
log.Fatalf("Server shutdown error: %v", err)
}
log.Info("Server gracefully stopped")
}
type interceptor = func(http.HandlerFunc) http.HandlerFunc
func (p *Server) configureHandler() {
p.configurer.ConfigureHandler(func(pattern string, handler http.HandlerFunc) {
//rate limit the EksCredentialsRequest request
rateLimiter := ratelimiter.NewRateLimiter(configuration.RequestRate)
// order here matters
interceptors := []interceptor{
// add logger so it can be used downstream
logger.InjectLogger,
// add rate limite to requests
func(h http.HandlerFunc) http.HandlerFunc { return ratelimiter.RateLimitMiddleware(rateLimiter, h) },
}
for _, intercept := range interceptors {
handler = intercept(handler)
}
// add the handler to the server mux
p.mux.Handle(pattern, handler)
})
}
func (p *Server) Addr() string {
return p.server.Addr
}