api/cmd/root.go (106 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cmd import ( "context" "fmt" "os" "os/signal" "syscall" "time" "github.com/spf13/cobra" "github.com/apisix/manager-api/internal/conf" "github.com/apisix/manager-api/internal/core/server" "github.com/apisix/manager-api/internal/core/storage" "github.com/apisix/manager-api/internal/core/store" "github.com/apisix/manager-api/internal/log" ) var rootCmd = &cobra.Command{ Use: "manager-api", Short: "Apache APISIX Manager API", RunE: func(cmd *cobra.Command, args []string) error { err := manageAPI() return err }, } func init() { rootCmd.PersistentFlags().StringVarP(&conf.ConfigFile, "config", "c", "", "config file") rootCmd.PersistentFlags().StringVarP(&conf.WorkDir, "work-dir", "p", ".", "current work directory") rootCmd.AddCommand( newVersionCommand(), ) } func Execute() { if err := rootCmd.Execute(); err != nil { _, _ = fmt.Fprintln(os.Stderr, err) } } func manageAPI() error { conf.InitConf() log.InitLogger() s, err := server.NewServer(&server.Options{}) if err != nil { return err } // start Manager API server errSig := make(chan error, 5) s.Start(errSig) // start etcd connection checker stopEtcdConnectionChecker := etcdConnectionChecker() // Signal received to the process externally. quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) select { case sig := <-quit: log.Infof("The Manager API server receive %s and start shutting down", sig.String()) stopEtcdConnectionChecker() s.Stop() log.Infof("See you next time!") case err := <-errSig: log.Errorf("The Manager API server start failed: %s", err.Error()) return err } return nil } func etcdConnectionChecker() context.CancelFunc { ctx, cancel := context.WithCancel(context.TODO()) unavailableTimes := 0 go func() { etcdClient := storage.GenEtcdStorage().GetClient() for { select { case <-time.Tick(10 * time.Second): sCtx, sCancel := context.WithTimeout(ctx, 5*time.Second) err := etcdClient.Sync(sCtx) sCancel() if err != nil { unavailableTimes++ log.Errorf("etcd connection loss detected, times: %d", unavailableTimes) continue } // After multiple failures, the connection is restored if unavailableTimes >= 1 { log.Warnf("etcd connection recovered, but after several connection losses, reinitializing stores, times: %d", unavailableTimes) unavailableTimes = 0 // When this happens, force a full re-initialization of the store store.RangeStore(func(key store.HubKey, store *store.GenericStore) bool { log.Warnf("etcd store reinitializing: resource: %s", key) if err := store.Init(); err != nil { log.Errorf("etcd store reinitialize failed: resource: %s, error: %s", key, err) } return true }) } else { log.Info("etcd connection is fine") } case <-ctx.Done(): return } } }() // Timed re-initialization when etcd watch actively exits go func() { for { select { case <-time.Tick(2 * time.Minute): err := store.ReInit() if err != nil { log.Errorf("resource re-initialize failed, err: %v", err) } } } }() return cancel }