api/cmd/root.go (106 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/spf13/cobra"
"github.com/apisix/manager-api/internal/conf"
"github.com/apisix/manager-api/internal/core/server"
"github.com/apisix/manager-api/internal/core/storage"
"github.com/apisix/manager-api/internal/core/store"
"github.com/apisix/manager-api/internal/log"
)
var rootCmd = &cobra.Command{
Use: "manager-api",
Short: "Apache APISIX Manager API",
RunE: func(cmd *cobra.Command, args []string) error {
err := manageAPI()
return err
},
}
func init() {
rootCmd.PersistentFlags().StringVarP(&conf.ConfigFile, "config", "c", "", "config file")
rootCmd.PersistentFlags().StringVarP(&conf.WorkDir, "work-dir", "p", ".", "current work directory")
rootCmd.AddCommand(
newVersionCommand(),
)
}
func Execute() {
if err := rootCmd.Execute(); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
}
}
func manageAPI() error {
conf.InitConf()
log.InitLogger()
s, err := server.NewServer(&server.Options{})
if err != nil {
return err
}
// start Manager API server
errSig := make(chan error, 5)
s.Start(errSig)
// start etcd connection checker
stopEtcdConnectionChecker := etcdConnectionChecker()
// Signal received to the process externally.
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
select {
case sig := <-quit:
log.Infof("The Manager API server receive %s and start shutting down", sig.String())
stopEtcdConnectionChecker()
s.Stop()
log.Infof("See you next time!")
case err := <-errSig:
log.Errorf("The Manager API server start failed: %s", err.Error())
return err
}
return nil
}
func etcdConnectionChecker() context.CancelFunc {
ctx, cancel := context.WithCancel(context.TODO())
unavailableTimes := 0
go func() {
etcdClient := storage.GenEtcdStorage().GetClient()
for {
select {
case <-time.Tick(10 * time.Second):
sCtx, sCancel := context.WithTimeout(ctx, 5*time.Second)
err := etcdClient.Sync(sCtx)
sCancel()
if err != nil {
unavailableTimes++
log.Errorf("etcd connection loss detected, times: %d", unavailableTimes)
continue
}
// After multiple failures, the connection is restored
if unavailableTimes >= 1 {
log.Warnf("etcd connection recovered, but after several connection losses, reinitializing stores, times: %d", unavailableTimes)
unavailableTimes = 0
// When this happens, force a full re-initialization of the store
store.RangeStore(func(key store.HubKey, store *store.GenericStore) bool {
log.Warnf("etcd store reinitializing: resource: %s", key)
if err := store.Init(); err != nil {
log.Errorf("etcd store reinitialize failed: resource: %s, error: %s", key, err)
}
return true
})
} else {
log.Info("etcd connection is fine")
}
case <-ctx.Done():
return
}
}
}()
// Timed re-initialization when etcd watch actively exits
go func() {
for {
select {
case <-time.Tick(2 * time.Minute):
err := store.ReInit()
if err != nil {
log.Errorf("resource re-initialize failed, err: %v", err)
}
}
}
}()
return cancel
}