server/middleware/middleware.go (113 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package middleware import ( "errors" "net/http" "strconv" "time" "github.com/apache/kvrocks-controller/store/engine/raft" "github.com/gin-gonic/gin" "github.com/prometheus/client_golang/prometheus" "github.com/apache/kvrocks-controller/consts" "github.com/apache/kvrocks-controller/metrics" "github.com/apache/kvrocks-controller/server/helper" "github.com/apache/kvrocks-controller/store" ) func CollectMetrics(c *gin.Context) { startTime := time.Now() c.Next() latency := time.Since(startTime).Milliseconds() uri := c.FullPath() // uri was empty means not found routes, so rewrite it to /not_found here if c.Writer.Status() == http.StatusNotFound && uri == "" { uri = "/not_found" } labels := prometheus.Labels{ "host": c.Request.Host, "uri": uri, "method": c.Request.Method, "code": strconv.Itoa(c.Writer.Status()), } metrics.Get().HTTPCodes.With(labels).Inc() metrics.Get().Latencies.With(labels).Observe(float64(latency)) size := c.Writer.Size() if size > 0 { metrics.Get().Payload.With(labels).Add(float64(size)) } } func RedirectIfNotLeader(c *gin.Context) { storage, _ := c.MustGet(consts.ContextKeyStore).(*store.ClusterStore) if storage.Leader() == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "no leader now, please retry later"}) c.Abort() return } _, isRaftMode := storage.GetEngine().(*raft.Node) // Raft engine will forward the request to the leader node under the hood, // so we don't need to do the redirect. if !storage.IsLeader() && !isRaftMode { if !c.GetBool(consts.HeaderIsRedirect) { c.Set(consts.HeaderIsRedirect, true) peerAddr := helper.ExtractAddrFromSessionID(storage.Leader()) c.Redirect(http.StatusTemporaryRedirect, "http://"+peerAddr+c.Request.RequestURI) c.Redirect(http.StatusTemporaryRedirect, "http://"+storage.Leader()+c.Request.RequestURI) } else { c.JSON(http.StatusBadRequest, gin.H{"error": "no leader now, please retry later"}) c.Abort() } return } c.Next() } func RequiredNamespace(c *gin.Context) { s, _ := c.MustGet(consts.ContextKeyStore).(*store.ClusterStore) ok, err := s.ExistsNamespace(c, c.Param("namespace")) if err != nil { helper.ResponseError(c, err) return } if !ok { helper.ResponseBadRequest(c, errors.New("namespace not found")) c.Abort() } else { c.Next() } } func RequiredCluster(c *gin.Context) { s, _ := c.MustGet(consts.ContextKeyStore).(*store.ClusterStore) cluster, err := s.GetCluster(c, c.Param("namespace"), c.Param("cluster")) if err != nil { helper.ResponseError(c, err) return } c.Set(consts.ContextKeyCluster, cluster) c.Next() } func RequiredClusterShard(c *gin.Context) { s, _ := c.MustGet(consts.ContextKeyStore).(*store.ClusterStore) cluster, err := s.GetCluster(c, c.Param("namespace"), c.Param("cluster")) if err != nil { helper.ResponseError(c, err) return } shardIndex, err := strconv.Atoi(c.Param("shard")) if err != nil { helper.ResponseBadRequest(c, err) return } shard, err := cluster.GetShard(shardIndex) if err != nil { helper.ResponseError(c, err) return } c.Set(consts.ContextKeyCluster, cluster) c.Set(consts.ContextKeyClusterShard, shard) c.Next() } func RequiredRaftEngine(c *gin.Context) { storage, _ := c.MustGet(consts.ContextKeyStore).(*store.ClusterStore) raftNode, ok := storage.GetEngine().(*raft.Node) if !ok { helper.ResponseBadRequest(c, errors.New("raft engine is not enabled")) c.Abort() return } c.Set(consts.ContextKeyRaftNode, raftNode) c.Next() }