cmd/exporter/app/server.go (112 lines of code) (raw):
/*
MIT License
Copyright (c) Microsoft Corporation.
*/
package app
import (
"context"
"fmt"
"net/http"
_ "net/http/pprof" // enable pprof in the server
"time"
"github.com/Azure/kubernetes-carbon-intensity-exporter/pkg/sdk/client"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
"k8s.io/apiserver/pkg/server/healthz"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/cli/globalflag"
"k8s.io/component-base/term"
"k8s.io/component-base/version/verflag"
"k8s.io/klog/v2"
exporterconfig "github.com/Azure/kubernetes-carbon-intensity-exporter/cmd/exporter/app/config"
"github.com/Azure/kubernetes-carbon-intensity-exporter/cmd/exporter/app/options"
"github.com/Azure/kubernetes-carbon-intensity-exporter/pkg/exporter"
)
var (
//exporter command args
configMapName = flag.String("configmap-name", "carbon-intensity", "Configmap name - Default 'carbonIntensity'")
patrolInterval = flag.String("patrol-interval", "12h", "Patrol interval in hours - Default every 12 hours")
region = flag.String("region", "", "Region to get carbon intensity for - Required")
)
func NewExporterCommand(stopChan <-chan struct{}) *cobra.Command {
s, err := options.NewExporterOptions()
if err != nil {
klog.Fatalf("unable to initialize command options: %v", err)
}
cmd := &cobra.Command{
Use: "carbon-data-exporter",
Long: `The carbon-data-exporter is a controller that pulls carbon intensity data from GSF API server`,
Run: func(cmd *cobra.Command, args []string) {
var err error
var c *exporterconfig.Config
verflag.PrintAndExitIfRequested()
c, err = s.Config()
if err != nil {
klog.Fatalf("unable to initialize command configs: %s", err.Error())
}
if err := Run(c.Complete(), stopChan); err != nil {
klog.Fatalf("unable to execute command : %s", err.Error())
}
},
}
fs := cmd.Flags()
namedFlagSets := s.Flags()
verflag.AddFlags(namedFlagSets.FlagSet("global"))
globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
usageFmt := "Usage:\n %s\n"
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
cmd.SetUsageFunc(func(cmd *cobra.Command) error {
fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
cliflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
return nil
})
cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
})
return cmd
}
func Run(cc *exporterconfig.CompletedConfig, stopCh <-chan struct{}) error {
// Init client SDK and exporter
apiClient := client.NewAPIClient(client.NewConfiguration())
e, err := exporter.New(cc.ClusterClient, apiClient, cc.Recorder)
if err != nil {
return fmt.Errorf("new syncer: %v", err)
}
// Prepare the event broadcaster.
if cc.Broadcaster != nil && cc.ClusterClient != nil {
cc.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.ClusterClient.CoreV1().Events("")})
}
// Start all informers.
cc.ClusterInformerFactory.Start(stopCh)
// Wait for all caches to sync before resource sync.
cc.ClusterInformerFactory.WaitForCacheSync(stopCh)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
// Prepare a reusable runCommand function.
run := startExporter(e, stopCh)
go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()
go func() {
// start a pprof http server
klog.Fatal(http.ListenAndServe(":6060", nil))
}()
go func() {
// start a health http server.
mux := http.NewServeMux()
healthz.InstallHandler(mux)
klog.Fatal(http.ListenAndServe(":8080", mux))
}()
run(ctx)
return fmt.Errorf("finished without leader elect")
}
func startExporter(p *exporter.Exporter, stopCh <-chan struct{}) func(context.Context) {
// Parse patrolInterval to time.Duration
ptDuration, err := time.ParseDuration(*patrolInterval)
if err != nil {
return func(ctx context.Context) {
klog.Fatalf("an error while parsing patrol-interval, err: %s", err.Error())
ctx.Err()
}
}
return func(ctx context.Context) {
p.Run(ctx, *configMapName, *region, ptDuration, stopCh)
<-ctx.Done()
}
}