pkg/cluster/healthcheck/healthcheck.go (255 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package healthcheck import ( "runtime/debug" "strings" "sync" "sync/atomic" "time" ) import ( "github.com/apache/dubbo-go-pixiu/pkg/logger" "github.com/apache/dubbo-go-pixiu/pkg/model" gxtime "github.com/dubbogo/gost/time" ) const ( DefaultTimeout = time.Second DefaultInterval = 3 * time.Second DefaultHealthyThreshold uint32 = 5 DefaultUnhealthyThreshold uint32 = 5 DefaultFirstInterval = 5 * time.Second ) type HealthChecker struct { checkers map[string]*EndpointChecker sessionConfig map[string]any // check config timeout time.Duration intervalBase time.Duration healthyThreshold uint32 initialDelay time.Duration cluster *model.ClusterConfig unhealthyThreshold uint32 protocol string } // EndpointChecker is a wrapper of types.HealthCheckSession for health check type EndpointChecker struct { endpoint *model.Endpoint HealthChecker *HealthChecker // checker, todo can extend to TCP, http, grpc, dubbo or other protocol checker checker Checker resp chan checkResponse timeout chan bool checkID uint64 stop chan struct{} checkTimer *gxtime.Timer checkTimeout *gxtime.Timer unHealthCount uint32 healthCount uint32 threshold uint32 once sync.Once } type Checker interface { CheckHealth() bool OnTimeout() } type checkResponse struct { ID uint64 Healthy bool } func CreateHealthCheck(cluster *model.ClusterConfig, cfg model.HealthCheckConfig) *HealthChecker { timeout, err := time.ParseDuration(cfg.TimeoutConfig) if err != nil { logger.Infof("[health check] timeout parse duration error %s", err) timeout = DefaultTimeout } interval, err := time.ParseDuration(cfg.IntervalConfig) if err != nil { logger.Infof("[health check] interval parse duration error %s", err) interval = DefaultInterval } initialDelay, err := time.ParseDuration(cfg.IntervalConfig) if err != nil { logger.Infof("[health check] initialDelay parse duration error %s", err) initialDelay = DefaultFirstInterval } unhealthyThreshold := cfg.UnhealthyThreshold if unhealthyThreshold == 0 { unhealthyThreshold = DefaultUnhealthyThreshold } healthyThreshold := cfg.HealthyThreshold if healthyThreshold == 0 { healthyThreshold = DefaultHealthyThreshold } hc := &HealthChecker{ protocol: cfg.Protocol, sessionConfig: cfg.SessionConfig, cluster: cluster, timeout: timeout, intervalBase: interval, healthyThreshold: healthyThreshold, unhealthyThreshold: unhealthyThreshold, initialDelay: initialDelay, checkers: make(map[string]*EndpointChecker), } return hc } func (hc *HealthChecker) Start() { // each endpoint for _, h := range hc.cluster.Endpoints { hc.startCheck(h) } } func (hc *HealthChecker) Stop() { for _, h := range hc.cluster.Endpoints { hc.stopCheck(h) } } func (hc *HealthChecker) StopOne(endpoint *model.Endpoint) { hc.stopCheck(endpoint) } func (hc *HealthChecker) StartOne(endpoint *model.Endpoint) { hc.startCheck(endpoint) } func (hc *HealthChecker) startCheck(endpoint *model.Endpoint) { addr := endpoint.Address.GetAddress() if _, ok := hc.checkers[addr]; !ok { c := newChecker(endpoint, hc) hc.checkers[addr] = c go c.Start() logger.Infof("[health check] create a health check session for %s", addr) } } func (hc *HealthChecker) stopCheck(endpoint *model.Endpoint) { addr := endpoint.Address.GetAddress() if c, ok := hc.checkers[addr]; ok { c.Stop() delete(hc.checkers, addr) logger.Infof("[health check] create a health check session for %s", addr) } } func newChecker(endpoint *model.Endpoint, hc *HealthChecker) *EndpointChecker { var checker Checker protocol := strings.ToLower(hc.protocol) switch protocol { case "tcp": checker = &TCPChecker{ address: endpoint.Address.GetAddress(), timeout: hc.timeout, } case "http": checker = &HTTPChecker{ address: endpoint.Address.GetAddress(), timeout: hc.timeout, } case "https": checker = &HTTPSChecker{ address: endpoint.Address.GetAddress(), timeout: hc.timeout, } default: logger.Warnf("[health check] %s health checker is not implemented, using tcp checker", hc.protocol) checker = &TCPChecker{ address: endpoint.Address.GetAddress(), timeout: hc.timeout, } } c := &EndpointChecker{ checker: checker, endpoint: endpoint, HealthChecker: hc, resp: make(chan checkResponse), timeout: make(chan bool), stop: make(chan struct{}), } return c } func (hc *HealthChecker) getCheckInterval() time.Duration { return hc.intervalBase } func (c *EndpointChecker) Start() { defer func() { if r := recover(); r != nil { logger.Warnf("[health check] node checker panic %v\n%s", r, string(debug.Stack())) } c.checkTimer.Stop() c.checkTimeout.Stop() }() c.checkTimer = gxtime.AfterFunc(c.HealthChecker.initialDelay, c.OnCheck) for { select { case <-c.stop: return default: // prepare a check currentID := atomic.AddUint64(&c.checkID, 1) select { case <-c.stop: return case resp := <-c.resp: if resp.ID == currentID { if c.checkTimeout != nil { c.checkTimeout.Stop() c.checkTimeout = nil } if resp.Healthy { c.HandleSuccess() } else { c.HandleFailure(false) } c.checkTimer = gxtime.AfterFunc(c.HealthChecker.getCheckInterval(), c.OnCheck) } case <-c.timeout: if c.checkTimer != nil { c.checkTimer.Stop() } c.HandleFailure(true) c.checkTimer = gxtime.AfterFunc(c.HealthChecker.getCheckInterval(), c.OnCheck) logger.Infof("[health check] receive a timeout response at id: %d", currentID) } } } } func (c *EndpointChecker) Stop() { c.once.Do(func() { close(c.stop) }) } func (c *EndpointChecker) HandleSuccess() { c.unHealthCount = 0 c.healthCount++ if c.healthCount > c.threshold { c.handleHealth() } } func (c *EndpointChecker) HandleFailure(timeout bool) { if timeout { c.HandleTimeout() } else { c.handleUnHealth() } } func (c *EndpointChecker) HandleTimeout() { c.healthCount = 0 c.unHealthCount++ if c.unHealthCount > c.threshold { c.handleUnHealth() } } func (c *EndpointChecker) handleHealth() { c.healthCount = 0 c.unHealthCount = 0 c.endpoint.UnHealthy = false } func (c *EndpointChecker) handleUnHealth() { c.healthCount = 0 c.unHealthCount = 0 c.endpoint.UnHealthy = true } func (c *EndpointChecker) OnCheck() { id := atomic.LoadUint64(&c.checkID) if c.checkTimeout != nil { c.checkTimeout.Stop() } c.checkTimeout = gxtime.AfterFunc(c.HealthChecker.timeout, c.OnTimeout) c.resp <- checkResponse{ ID: id, Healthy: c.checker.CheckHealth(), } } func (c *EndpointChecker) OnTimeout() { c.timeout <- true }