pkg/server/server.go (245 lines of code) (raw):

package server import ( "context" "crypto/tls" "errors" "fmt" "io" "log/slog" "net/http" "os" "os/signal" "sync" "syscall" "time" "github.com/ClickHouse/ch-go" "github.com/JetBrains/ij-perf-report-aggregator/pkg/server/auth" "github.com/JetBrains/ij-perf-report-aggregator/pkg/server/meta" "github.com/JetBrains/ij-perf-report-aggregator/pkg/util" "github.com/andybalholm/brotli" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/puddle/v2" "github.com/nats-io/nats.go" "github.com/rs/cors" "github.com/valyala/bytebufferpool" "go.deanishe.net/env" ) const DefaultDbUrl = "127.0.0.1:9000" type StatsServer struct { dbUrl string nameToDbPool sync.Map poolMutex sync.Mutex } func Serve(dbUrl string, natsUrl string) error { if dbUrl == "" { dbUrl = DefaultDbUrl } dbpool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL")) dbpool.Config().MaxConns = 10 if err != nil { return err } statsServer := &StatsServer{ dbUrl: dbUrl, } defer func() { statsServer.nameToDbPool.Range(func(_, pool any) bool { p, ok := pool.(*puddle.Pool[*ch.Client]) if ok { p.Close() } return true }) }() cacheManager, err := NewResponseCacheManager() if err != nil { return err } router := chi.NewRouter() disposer := util.NewDisposer() defer disposer.Dispose() if natsUrl != "" { err = listenNats(cacheManager, natsUrl, disposer) if err != nil { return err } } else { slog.Info("no nats server configured") } router.Use(middleware.AllowContentType("application/octet-stream", "application/json")) router.Use(cors.New(cors.Options{ AllowedOrigins: []string{"*"}, AllowedMethods: []string{"GET", "POST", "DELETE"}, AllowedHeaders: []string{"*"}, MaxAge: 50, }).Handler) router.Use(middleware.Heartbeat("/health-check")) router.Use(middleware.Recoverer) router.Route("/api/meta", func(r chi.Router) { r.Route("/accidents", func(r chi.Router) { r.Post("/*", meta.CreatePostAccidentRequestHandler(dbpool)) r.Delete("/*", meta.CreateDeleteAccidentRequestHandler(dbpool)) r.Get("/*", meta.CreateGetAccidentByIdHandler(dbpool)) }) r.Post("/getAccidents*", meta.CreateGetManyAccidentsRequestHandler(dbpool)) r.Get("/description*", meta.CreateGetDescriptionRequestHandler(dbpool)) r.Post("/accidentsAroundDate*", meta.CreateGetAccidentsAroundDateRequestHandler(dbpool)) r.Post("/missingData", meta.CreatePostMissingDataRequestHandler(dbpool)) r.Route("/youtrack", func(r chi.Router) { r.Post("/createIssue", meta.CreatePostCreateIssueByAccident(dbpool)) r.Post("/uploadAttachments", meta.CreatePostUploadAttachmentsToIssue()) }) r.Route("/teamcity", func(r chi.Router) { r.Post("/startBisect", meta.CreatePostStartBisect()) r.Get("/changes", meta.HandleGetTeamCityChanges()) r.Get("/buildType", meta.HandleGetTeamCityBuildType()) r.Get("/buildCounter", meta.HandleGetTeamCityBuildCounter()) r.Get("/buildInfo", meta.HandleGetTeamCityBuildInfo()) }) }) router.Route("/api/auth", func(r chi.Router) { r.Route("/userinfo", func(r chi.Router) { r.Get("/*", auth.CreateGetUserInfoHandler()) }) }) router.Post("/api/evaluateMetric*", statsServer.CreateProcessMetricDataHandler()) router.Group(func(r chi.Router) { compressor := middleware.NewCompressor(5) compressor.SetEncoder("br", func(w io.Writer, level int) io.Writer { return brotli.NewWriterV2(w, level) }) r.Use(compressor.Handler) r.Route("/api/", func(r chi.Router) { r.Route("/v1", func(r chi.Router) { r.Handle("/meta/measure", cacheManager.CreateHandler(statsServer.handleMetaMeasureRequest)) r.Handle("/load/*", cacheManager.CreateHandler(statsServer.handleLoadRequest)) }) r.Handle("/q/*", cacheManager.CreateHandler(statsServer.handleLoadRequestV2)) r.Handle("/highlightingPasses*", cacheManager.CreateHandler(statsServer.getDistinctHighlightingPasses)) r.Handle("/compareBranches*", cacheManager.CreateHandler(statsServer.getBranchComparison)) r.Handle("/compareModes*", cacheManager.CreateHandler(statsServer.getModeComparison)) r.Handle("/zstd-dictionary/*", &CachingHandler{ handler: func(_ *http.Request) (*bytebufferpool.ByteBuffer, bool, error) { return &bytebufferpool.ByteBuffer{B: util.ZstdDictionary}, false, nil }, manager: cacheManager, }) }) }) server := listenAndServe(env.Get("SERVER_PORT", "9044"), router) slog.Info("started", "server", server.Addr, "clickhouse", dbUrl, "nats", natsUrl) waitUntilTerminated(server, 1*time.Minute) return nil } func (t *StatsServer) AcquireDatabase(ctx context.Context, name string) (*puddle.Resource[*ch.Client], error) { untypedPool, exists := t.nameToDbPool.Load(name) var pool *puddle.Pool[*ch.Client] var err error isCorrectPool := true if exists { pool, isCorrectPool = untypedPool.(*puddle.Pool[*ch.Client]) } else { pool, err = createStoreForDatabaseUnderLock(name, t) } if err != nil { return nil, fmt.Errorf("cannot create pool: %w", err) } if !isCorrectPool { return nil, errors.New("pool can't be casted to (*puddle.Pool[*ch.YoutrackClient])") } resource, err := pool.Acquire(ctx) if err != nil { return nil, fmt.Errorf("cannot acquire from pool: %w", err) } return resource, nil } func createStoreForDatabaseUnderLock(name string, t *StatsServer) (*puddle.Pool[*ch.Client], error) { t.poolMutex.Lock() defer t.poolMutex.Unlock() pool, err := puddle.NewPool(&puddle.Config[*ch.Client]{ MaxSize: 16, Destructor: func(value *ch.Client) { _ = value.Close() }, Constructor: func(ctx context.Context) (*ch.Client, error) { client, err := ch.Dial(ctx, ch.Options{ Address: t.dbUrl, Database: name, Settings: []ch.Setting{ ch.SettingInt("readonly", 1), ch.SettingInt("max_query_size", 1000000), ch.SettingInt("max_memory_usage", 3221225472), }, }) return client, err }, }) if err == nil && pool != nil { t.nameToDbPool.Store(name, pool) } return pool, err } func listenNats(cacheManager *ResponseCacheManager, natsUrl string, disposer *util.Disposer) error { // wait when nats service will be deployed nc, err := nats.Connect(natsUrl, nats.Timeout(30*time.Second)) if err != nil { return fmt.Errorf("can't connect to nats: %w", err) } ncSubscription, err := nc.Subscribe("server.clearCache", func(m *nats.Msg) { cacheManager.Clear() slog.Info("cache cleared", "sender", m.Data) }) if err != nil { return fmt.Errorf("can't subscribe to nats: %w", err) } disposer.Add(func() { err := ncSubscription.Unsubscribe() if err != nil { slog.Error("cannot unsubscribe", "error", err) } }) return nil } func listenAndServe(port string, mux http.Handler) *http.Server { // buffer size is 4096 https://github.com/golang/go/issues/13870 server := &http.Server{ Addr: ":" + port, Handler: mux, ReadTimeout: 300 * time.Second, WriteTimeout: 300 * time.Second, TLSConfig: &tls.Config{ MinVersion: tls.VersionTLS12, }, } go func() { err := server.ListenAndServe() if errors.Is(err, http.ErrServerClosed) { slog.Debug("server closed") } else { slog.Error("cannot serve", "error", err, "port", port) os.Exit(1) } }() return server } func waitUntilTerminated(server *http.Server, shutdownTimeout time.Duration) { signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt, syscall.SIGTERM) <-signals shutdownHttpServer(server, shutdownTimeout) } func shutdownHttpServer(server *http.Server, shutdownTimeout time.Duration) { if server == nil { return } ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) defer cancel() slog.Info("shutdown server", "timeout", shutdownTimeout) start := time.Now() err := server.Shutdown(ctx) if err != nil { slog.Error("cannot shutdown server", "error", err) return } slog.Info("server is shutdown", "duration", time.Since(start)) }