server/server.go (116 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package server
import (
"context"
"errors"
"fmt"
"net/http"
"net/http/pprof"
"strings"
"time"
"github.com/apache/kvrocks-controller/store/engine/consul"
"github.com/apache/kvrocks-controller/store/engine/raft"
"github.com/gin-gonic/gin"
"github.com/apache/kvrocks-controller/config"
"github.com/apache/kvrocks-controller/controller"
"github.com/apache/kvrocks-controller/logger"
"github.com/apache/kvrocks-controller/server/helper"
"github.com/apache/kvrocks-controller/store"
"github.com/apache/kvrocks-controller/store/engine"
"github.com/apache/kvrocks-controller/store/engine/etcd"
"github.com/apache/kvrocks-controller/store/engine/zookeeper"
)
type Server struct {
engine *gin.Engine
store *store.ClusterStore
controller *controller.Controller
config *config.Config
httpServer *http.Server
}
func NewServer(cfg *config.Config) (*Server, error) {
var persist engine.Engine
var err error
sessionID := helper.GenerateSessionID(cfg.Addr)
storageType := strings.ToLower(cfg.StorageType)
switch storageType {
case "etcd":
logger.Get().Info("Use Etcd as store")
persist, err = etcd.New(sessionID, cfg.Etcd)
case "zookeeper":
logger.Get().Info("Use Zookeeper as store")
persist, err = zookeeper.New(sessionID, cfg.Zookeeper)
case "raft":
logger.Get().Info("Use Raft as store")
persist, err = raft.New(cfg.Raft)
case "consul":
logger.Get().Info("Use Consul as store")
persist, err = consul.New(sessionID, cfg.Consul)
default:
logger.Get().Info("Use Etcd as default store")
persist, err = etcd.New(sessionID, cfg.Etcd)
}
if err != nil {
return nil, err
}
if persist == nil {
return nil, fmt.Errorf("no found any store config")
}
clusterStore := store.NewClusterStore(persist)
ctrl, err := controller.New(clusterStore, cfg.Controller)
if err != nil {
return nil, err
}
gin.SetMode(gin.ReleaseMode)
return &Server{
store: clusterStore,
controller: ctrl,
config: cfg,
engine: gin.New(),
}, nil
}
func (srv *Server) startAPIServer() {
srv.initHandlers()
httpServer := &http.Server{
Addr: srv.config.Addr,
Handler: srv.engine,
}
go func() {
if err := httpServer.ListenAndServe(); err != nil {
if errors.Is(err, http.ErrServerClosed) {
return
}
panic(fmt.Errorf("API server: %w", err))
}
}()
srv.httpServer = httpServer
}
func PProf(c *gin.Context) {
switch c.Param("profile") {
case "/cmdline":
pprof.Cmdline(c.Writer, c.Request)
case "/symbol":
pprof.Symbol(c.Writer, c.Request)
case "/profile":
pprof.Profile(c.Writer, c.Request)
case "/trace":
pprof.Trace(c.Writer, c.Request)
default:
pprof.Index(c.Writer, c.Request)
}
}
func (srv *Server) Start(ctx context.Context) error {
if ok := srv.store.IsReady(ctx); !ok {
return fmt.Errorf("the cluster store is not ready")
}
if err := srv.controller.Start(ctx); err != nil {
return err
}
srv.controller.WaitForReady()
srv.startAPIServer()
return nil
}
func (srv *Server) Stop() error {
srv.controller.Close()
gracefulCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return srv.httpServer.Shutdown(gracefulCtx)
}