proxy/health/health.go (151 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package health import ( "errors" "fmt" "github.com/apache/servicecomb-mesher/proxy/config" "github.com/go-chassis/go-chassis/v2/core/registry" "github.com/go-chassis/go-chassis/v2/pkg/runtime" "github.com/go-chassis/openlog" "net" "regexp" "strconv" "strings" "time" ) const ( DefaultInterval = time.Second * 30 DefaultTimeout = time.Second * 10 ) //Error definitions var ( ErrPortEmpty = errors.New("port is empty") ErrInvalidURI = errors.New("uri must start with /") ) //Deal handle the unhealthy status type Deal func(err error) //L7Check is the interface for L7 checker type L7Check func(check *config.HealthCheck, address string) error //l7Checks save l7 check func var l7Checks = make(map[string]L7Check) //InstallChecker install checkers func InstallChecker(n string, c L7Check) { l7Checks[n] = c } //UpdateInstanceStatus update status in registrator, it just works in client side discovery func UpdateInstanceStatus(err error) { if registry.DefaultRegistrator == nil { openlog.Warn("Registrator is nil, can not update instance status") return } if err != nil { if runtime.InstanceStatus == runtime.StatusRunning { openlog.Info("service is not healthy, update status") ChangeStatus(runtime.StatusDown) } } else { if runtime.InstanceStatus == runtime.StatusDown { openlog.Info("service is healthy, update status") ChangeStatus(runtime.StatusRunning) } } } //ChangeStatus change status in local and remote func ChangeStatus(status string) { if err := registry.DefaultRegistrator.UpdateMicroServiceInstanceStatus(runtime.ServiceID, runtime.InstanceID, status); err != nil { openlog.Error("update instance status failed:" + err.Error()) return } runtime.InstanceStatus = status openlog.Info("update instance status to: " + runtime.InstanceStatus) } //runCheckers run check routines func runCheckers(c *config.HealthCheck, l7check L7Check, address string, deal Deal) (err error) { var interval = DefaultInterval if c.Interval != "" { interval, err = time.ParseDuration(c.Interval) if err != nil { return err } } ticker := time.NewTicker(interval) go func() { for range ticker.C { err := CheckService(c, l7check, address) if err != nil { openlog.Error(fmt.Sprintf("health check failed for service port[%s]: %s", c.Port, err)) } deal(err) } }() return nil } //CheckService check service health based on config func CheckService(c *config.HealthCheck, l7check L7Check, address string) error { openlog.Debug(fmt.Sprintf("check port [%s]", c.Port)) if l7check != nil { if err := l7check(c, address); err != nil { return err } } else { if err := L4Check(address); err != nil { return err } } openlog.Debug("service is healthy: " + address) return nil } //L4Check check tcp connection func L4Check(address string) error { c, err := net.DialTimeout("tcp", address, DefaultTimeout) if err != nil { return err } if err = c.Close(); err != nil { return err } return nil } //Run Launch go routines to check service health func Run() error { openlog.Info("local health manager start") for _, v := range config.GetConfig().HealthCheck { openlog.Debug(fmt.Sprintf("check local health [%s],protocol [%s]", v.Port, v.Protocol)) address, check, err := ParseConfig(v) if err != nil { openlog.Warn("Health keeper can not check health") return err } //TODO make pluggable Deal if err := runCheckers(v, check, address, UpdateInstanceStatus); err != nil { return err } } return nil } //ParseConfig validate config and return address, checker //port name must not be empty //port name must named as {protocol}-{name} //protocol must has checker func ParseConfig(c *config.HealthCheck) (string, L7Check, error) { if c.Port == "" { return "", nil, ErrPortEmpty } var check L7Check if c.Protocol != "" { var ok bool check, ok = l7Checks[c.Protocol] if !ok { return "", nil, errors.New("don not support L7 checker:" + c.Protocol) } } else { check = nil } address := "127.0.0.1:" + c.Port if c.URI != "" { if !strings.HasPrefix(c.URI, "/") { return "", nil, ErrInvalidURI } } if c.Match != nil { if c.Match.Status != "" { _, err := strconv.Atoi(c.Match.Status) if err != nil { return "", nil, err } } if c.Match.Body != "" { _, err := regexp.Compile(c.Match.Body) if err != nil { return "", nil, err } } } return address, check, nil } func init() { InstallChecker("rest", HTTPCheck) }