main.go (333 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package main
import (
"context"
"crypto/tls"
"flag"
"fmt"
"log"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"strings"
"syscall"
"time"
gstorage "cloud.google.com/go/storage"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.elastic.co/apm/module/apmgorilla/v2"
"go.elastic.co/apm/v2"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
ucfgYAML "github.com/elastic/go-ucfg/yaml"
"github.com/elastic/package-registry/internal/util"
"github.com/elastic/package-registry/metrics"
"github.com/elastic/package-registry/packages"
"github.com/elastic/package-registry/proxymode"
"github.com/elastic/package-registry/storage"
)
const (
serviceName = "package-registry"
version = "1.28.1"
defaultInstanceName = "localhost"
)
var (
address string
httpProfAddress string
metricsAddress string
logLevel *zapcore.Level
logType string
tlsCertFile string
tlsKeyFile string
tlsMinVersionValue tlsVersionValue
dryRun bool
configPath string
printVersionInfo bool
featureStorageIndexer bool
storageIndexerBucketInternal string
storageEndpoint string
storageIndexerWatchInterval time.Duration
featureProxyMode bool
proxyTo string
defaultConfig = Config{
CacheTimeIndex: 10 * time.Second,
CacheTimeSearch: 10 * time.Minute,
CacheTimeCategories: 10 * time.Minute,
CacheTimeCatchAll: 10 * time.Minute,
}
)
func init() {
flag.BoolVar(&printVersionInfo, "version", false, "Print Elastic Package Registry version")
flag.StringVar(&address, "address", "localhost:8080", "Address of the package-registry service.")
flag.StringVar(&metricsAddress, "metrics-address", "", "Address to expose the Prometheus metrics (experimental). ")
logLevel = zap.LevelFlag("log-level", zap.InfoLevel, "log level (default \"info\")")
flag.StringVar(&logType, "log-type", util.DefaultLoggerType, "log type (ecs, dev)")
flag.StringVar(&tlsCertFile, "tls-cert", "", "Path of the TLS certificate.")
flag.StringVar(&tlsKeyFile, "tls-key", "", "Path of the TLS key.")
flag.Var(&tlsMinVersionValue, "tls-min-version", "Minimum version TLS supported.")
flag.StringVar(&configPath, "config", "config.yml", "Path to the configuration file.")
flag.StringVar(&httpProfAddress, "httpprof", "", "Enable HTTP profiler listening on the given address.")
// This flag is experimental and might be removed in the future or renamed
flag.BoolVar(&dryRun, "dry-run", false, "Runs a dry-run of the registry without starting the web service (experimental).")
flag.BoolVar(&packages.ValidationDisabled, "disable-package-validation", false, "Disable package content validation.")
// The following storage related flags are technical preview and might be removed in the future or renamed
flag.BoolVar(&featureStorageIndexer, "feature-storage-indexer", false, "Enable storage indexer to include packages from Package Storage v2 (technical preview).")
flag.StringVar(&storageIndexerBucketInternal, "storage-indexer-bucket-internal", "", "Path to the internal Package Storage bucket (with gs:// prefix).")
flag.StringVar(&storageEndpoint, "storage-endpoint", "https://package-storage.elastic.co/", "Package Storage public endpoint.")
flag.DurationVar(&storageIndexerWatchInterval, "storage-indexer-watch-interval", 1*time.Minute, "Address of the package-registry service.")
// The following proxy-indexer related flags are technical preview and might be removed in the future or renamed
flag.BoolVar(&featureProxyMode, "feature-proxy-mode", false, "Enable proxy mode to include packages from other endpoint (technical preview).")
flag.StringVar(&proxyTo, "proxy-to", "https://epr.elastic.co/", "Proxy-to endpoint")
}
type Config struct {
PackagePaths []string `config:"package_paths"`
CacheTimeIndex time.Duration `config:"cache_time.index"`
CacheTimeSearch time.Duration `config:"cache_time.search"`
CacheTimeCategories time.Duration `config:"cache_time.categories"`
CacheTimeCatchAll time.Duration `config:"cache_time.catch_all"`
}
func main() {
err := parseFlags()
if err != nil {
log.Fatal(err)
}
if tlsMinVersionValue > 0 {
if tlsCertFile == "" || tlsKeyFile == "" {
log.Fatalf("-tls-min-version set but missing TLS cert and key files (-tls-cert and -tls-key)")
}
}
if printVersionInfo {
fmt.Printf("Elastic Package Registry version %v\n", version)
os.Exit(0)
}
apmTracer := initAPMTracer()
defer apmTracer.Close()
logger, err := util.NewLogger(util.LoggerOptions{
APMTracer: apmTracer,
Level: logLevel,
Type: logType,
})
if err != nil {
log.Fatalf("Failed to initialize logging: %v", err)
}
defer logger.Sync()
apmTracer.SetLogger(&util.LoggerAdapter{logger.With(zap.String("log.logger", "apm"))})
config := mustLoadConfig(logger)
if dryRun {
logger.Info("Running dry-run mode")
_ = initIndexer(context.Background(), logger, apmTracer, config)
os.Exit(0)
}
logger.Info("Package registry started")
defer logger.Info("Package registry stopped")
initHttpProf(logger)
server := initServer(logger, apmTracer, config)
go func() {
err := runServer(server)
if err != nil && err != http.ErrServerClosed {
logger.Fatal("error occurred while serving", zap.Error(err))
}
}()
initMetricsServer(logger)
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
<-stop
ctx := context.Background()
if err := server.Shutdown(ctx); err != nil {
logger.Fatal("error on shutdown", zap.Error(err))
}
}
func initHttpProf(logger *zap.Logger) {
if httpProfAddress == "" {
return
}
logger.Info("Starting http pprof in " + httpProfAddress)
go func() {
err := http.ListenAndServe(httpProfAddress, nil)
if err != nil {
logger.Fatal("failed to start HTTP profiler", zap.Error(err))
}
}()
}
func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
return defaultInstanceName
}
return hostname
}
func initMetricsServer(logger *zap.Logger) {
if metricsAddress == "" {
return
}
hostname := getHostname()
metrics.ServiceInfo.With(prometheus.Labels{"version": version, "instance": hostname}).Set(1)
logger.Info("Starting http metrics in " + metricsAddress)
go func() {
router := http.NewServeMux()
router.Handle("/metrics", promhttp.Handler())
err := http.ListenAndServe(metricsAddress, router)
if err != nil {
logger.Fatal("failed to start Prometheus metrics endpoint", zap.Error(err))
}
}()
}
func initIndexer(ctx context.Context, logger *zap.Logger, apmTracer *apm.Tracer, config *Config) Indexer {
packagesBasePaths := getPackagesBasePaths(config)
var combined CombinedIndexer
if featureStorageIndexer {
storageClient, err := gstorage.NewClient(ctx)
if err != nil {
logger.Fatal("can't initialize storage client", zap.Error(err))
}
combined = append(combined, storage.NewIndexer(logger, storageClient, storage.IndexerOptions{
APMTracer: apmTracer,
PackageStorageBucketInternal: storageIndexerBucketInternal,
PackageStorageEndpoint: storageEndpoint,
WatchInterval: storageIndexerWatchInterval,
}))
}
combined = append(combined,
packages.NewZipFileSystemIndexer(logger, packagesBasePaths...),
packages.NewFileSystemIndexer(logger, packagesBasePaths...),
)
ensurePackagesAvailable(ctx, logger, combined)
return combined
}
func initServer(logger *zap.Logger, apmTracer *apm.Tracer, config *Config) *http.Server {
tx := apmTracer.StartTransaction("initServer", "backend.init")
defer tx.End()
ctx := apm.ContextWithTransaction(context.TODO(), tx)
indexer := initIndexer(ctx, logger, apmTracer, config)
router := mustLoadRouter(logger, config, indexer)
apmgorilla.Instrument(router, apmgorilla.WithTracer(apmTracer))
var tlsConfig tls.Config
if tlsMinVersionValue > 0 {
tlsConfig.MinVersion = uint16(tlsMinVersionValue)
}
return &http.Server{Addr: address, Handler: router, TLSConfig: &tlsConfig}
}
func runServer(server *http.Server) error {
if tlsCertFile != "" && tlsKeyFile != "" {
return server.ListenAndServeTLS(tlsCertFile, tlsKeyFile)
}
return server.ListenAndServe()
}
func initAPMTracer() *apm.Tracer {
apm.DefaultTracer().Close()
if _, found := os.LookupEnv("ELASTIC_APM_SERVER_URL"); !found {
// Don't report anything if the Server URL hasn't been configured.
return apm.DefaultTracer()
}
tracer, err := apm.NewTracerOptions(apm.TracerOptions{
ServiceName: serviceName,
ServiceVersion: version,
})
if err != nil {
log.Fatalf("Failed to initialize APM agent: %v", err)
}
return tracer
}
func mustLoadConfig(logger *zap.Logger) *Config {
config, err := getConfig(logger)
if err != nil {
logger.Fatal("getting config", zap.Error(err))
}
printConfig(logger, config)
return config
}
func getConfig(logger *zap.Logger) (*Config, error) {
cfg, err := ucfgYAML.NewConfigWithFile(configPath)
if os.IsNotExist(err) {
logger.Fatal("Configuration file is not available: " + configPath)
}
if err != nil {
return nil, fmt.Errorf("reading config failed (path: %s): %w", configPath, err)
}
config := defaultConfig
err = cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("unpacking config failed (path: %s): %w", configPath, err)
}
return &config, nil
}
func getPackagesBasePaths(config *Config) []string {
var paths []string
paths = append(paths, config.PackagePaths...)
return paths
}
func printConfig(logger *zap.Logger, config *Config) {
logger.Info("Packages paths: " + strings.Join(config.PackagePaths, ", "))
logger.Info("Cache time for /: " + config.CacheTimeIndex.String())
logger.Info("Cache time for /index.json: " + config.CacheTimeIndex.String())
logger.Info("Cache time for /search: " + config.CacheTimeSearch.String())
logger.Info("Cache time for /categories: " + config.CacheTimeCategories.String())
logger.Info("Cache time for all others: " + config.CacheTimeCatchAll.String())
}
func ensurePackagesAvailable(ctx context.Context, logger *zap.Logger, indexer Indexer) {
err := indexer.Init(ctx)
if err != nil {
logger.Fatal("Init failed", zap.Error(err))
}
packages, err := indexer.Get(ctx, nil)
if err != nil {
logger.Fatal("Cannot get packages from indexer", zap.Error(err))
}
if len(packages) > 0 {
logger.Info(fmt.Sprintf("%v local package manifests loaded.", len(packages)))
} else if featureProxyMode {
logger.Info("No local packages found, but the proxy mode can access remote ones.")
} else {
logger.Fatal("No local packages found.")
}
metrics.NumberIndexedPackages.Set(float64(len(packages)))
}
func mustLoadRouter(logger *zap.Logger, config *Config, indexer Indexer) *mux.Router {
router, err := getRouter(logger, config, indexer)
if err != nil {
logger.Fatal("failed go configure router", zap.Error(err))
}
return router
}
func getRouter(logger *zap.Logger, config *Config, indexer Indexer) (*mux.Router, error) {
if featureProxyMode {
logger.Info("Technical preview: Proxy mode is an experimental feature and it may be unstable.")
}
proxyMode, err := proxymode.NewProxyMode(logger, proxymode.ProxyOptions{
Enabled: featureProxyMode,
ProxyTo: proxyTo,
})
if err != nil {
return nil, fmt.Errorf("can't create proxy mode: %w", err)
}
artifactsHandler := artifactsHandlerWithProxyMode(logger, indexer, proxyMode, config.CacheTimeCatchAll)
signaturesHandler := signaturesHandlerWithProxyMode(logger, indexer, proxyMode, config.CacheTimeCatchAll)
faviconHandleFunc, err := faviconHandler(config.CacheTimeCatchAll)
if err != nil {
return nil, err
}
indexHandlerFunc, err := indexHandler(config.CacheTimeIndex)
if err != nil {
return nil, err
}
categoriesHandler := categoriesHandlerWithProxyMode(logger, indexer, proxyMode, config.CacheTimeCategories)
packageIndexHandler := packageIndexHandlerWithProxyMode(logger, indexer, proxyMode, config.CacheTimeCatchAll)
searchHandler := searchHandlerWithProxyMode(logger, indexer, proxyMode, config.CacheTimeSearch)
staticHandler := staticHandlerWithProxyMode(logger, indexer, proxyMode, config.CacheTimeCatchAll)
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/", indexHandlerFunc)
router.HandleFunc("/index.json", indexHandlerFunc)
router.HandleFunc("/search", searchHandler)
router.HandleFunc("/categories", categoriesHandler)
router.HandleFunc("/health", healthHandler)
router.HandleFunc("/favicon.ico", faviconHandleFunc)
router.HandleFunc(artifactsRouterPath, artifactsHandler)
router.HandleFunc(signaturesRouterPath, signaturesHandler)
router.HandleFunc(packageIndexRouterPath, packageIndexHandler)
router.HandleFunc(staticRouterPath, staticHandler)
router.Use(util.LoggingMiddleware(logger))
router.Use(util.CORSMiddleware())
if metricsAddress != "" {
router.Use(metrics.MetricsMiddleware())
}
router.NotFoundHandler = notFoundHandler(fmt.Errorf("404 page not found"))
return router, nil
}
// healthHandler is used for Docker/K8s deployments. It returns 200 if the service is live
// In addition ?ready=true can be used for a ready request. Currently both are identical.
func healthHandler(w http.ResponseWriter, r *http.Request) {}