cmd/kubernetes-history-inspector/main.go (231 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"os/signal"
"strings"
"syscall"
"github.com/GoogleCloudPlatform/khi/pkg/common/errorreport"
"github.com/GoogleCloudPlatform/khi/pkg/common/flag"
"github.com/GoogleCloudPlatform/khi/pkg/inspection"
inspection_common "github.com/GoogleCloudPlatform/khi/pkg/inspection/common"
"github.com/GoogleCloudPlatform/khi/pkg/inspection/logger"
inspection_task "github.com/GoogleCloudPlatform/khi/pkg/inspection/task"
"github.com/GoogleCloudPlatform/khi/pkg/lifecycle"
"github.com/GoogleCloudPlatform/khi/pkg/model/k8s"
"github.com/GoogleCloudPlatform/khi/pkg/parameters"
"github.com/GoogleCloudPlatform/khi/pkg/server"
"github.com/GoogleCloudPlatform/khi/pkg/server/upload"
common "github.com/GoogleCloudPlatform/khi/pkg/source/common/k8s_audit"
"github.com/GoogleCloudPlatform/khi/pkg/source/gcp"
"github.com/GoogleCloudPlatform/khi/pkg/source/gcp/api"
"github.com/GoogleCloudPlatform/khi/pkg/source/gcp/api/accesstoken"
"github.com/GoogleCloudPlatform/khi/pkg/source/gcp/api/quotaproject"
"github.com/GoogleCloudPlatform/khi/pkg/source/oss"
"cloud.google.com/go/profiler"
)
func displayStartMessage(host string, port int) {
var (
bold = "\033[1m"
green = "\033[32m"
cyan = "\033[36m"
reset = "\033[0m"
)
if parameters.Debug.NoColor != nil && *parameters.Debug.NoColor {
bold = ""
green = ""
cyan = ""
reset = ""
}
hostInHintText := host
if host == "0.0.0.0" || host == "127.0.0.1" {
hostInHintText = "localhost"
}
fmt.Printf(`%[1]s%[2]s%[3]s Starting KHI server with listening %[4]s:%[5]d%[1]s`, reset, bold, green, host, port)
if hostInHintText == "localhost" {
fmt.Printf(`
%[4]s%[2]sFor Cloud Shell users:
Click this address >> %[3]shttp://%[5]s:%[6]d%[1]s%[2]s%[4]s << Click this address
%[1]s%[4]s(For users of the other environments: Access %[3]shttp://%[5]s:%[6]d%[1]s%[4]s with your browser. Consider SSH port-forwarding when you run KHI over SSH.)
%[1]s`, reset, bold, green, cyan, hostInHintText, port)
}
}
var taskSetRegistrer []inspection.PrepareInspectionServerFunc = make([]inspection.PrepareInspectionServerFunc, 0)
func init() {
parameters.AddStore(parameters.Help)
parameters.AddStore(parameters.Common)
parameters.AddStore(parameters.Server)
parameters.AddStore(parameters.Job)
parameters.AddStore(parameters.Auth)
parameters.AddStore(parameters.Debug)
taskSetRegistrer = append(taskSetRegistrer, inspection_common.PrepareInspectionServer)
taskSetRegistrer = append(taskSetRegistrer, gcp.PrepareInspectionServer)
taskSetRegistrer = append(taskSetRegistrer, oss.Prepare)
taskSetRegistrer = append(taskSetRegistrer, common.Register)
}
func handleTerminateSignal(exitCh chan<- int) {
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT)
s := <-sig
lifecycle.Default.NotifyTerminate(s)
exitCh <- 128 + int(s.(syscall.Signal))
}
func main() {
// main() shouldn't have the other line other than this line, for os.Exit(N) to prevent calling `defer errorreport.CheckAndReportPanic`
os.Exit(run())
}
func run() int {
defer errorreport.CheckAndReportPanic()
logger.InitGlobalKHILogger()
err := parameters.Parse()
if err != nil {
slog.Error(err.Error())
return 1
}
if *parameters.Debug.Verbose {
flag.DumpAll(context.Background())
}
if *parameters.Debug.Profiler {
cfg := profiler.Config{
Service: *parameters.Debug.ProfilerService,
ProjectID: *parameters.Debug.ProfilerProject,
MutexProfiling: true,
}
if err := profiler.Start(cfg); err != nil {
slog.Error(fmt.Sprintf("Failed to start profiler\n%s", err.Error()))
}
slog.Info("Cloud Profiler is enabled")
}
lifecycle.Default.NotifyInit()
slog.Info("Initializing Kubernetes History Inspector...")
k8s.GenerateDefaultMergeConfig()
if *parameters.Auth.QuotaProjectID != "" {
api.DefaultGCPClientFactory.RegisterHeaderProvider(quotaproject.NewHeaderProvider(*parameters.Auth.QuotaProjectID))
}
inspectionServer, err := inspection.NewServer()
if err != nil {
slog.Error(fmt.Sprintf("Failed to construct the inspection server due to unexpected error\n%v", err))
}
if !*parameters.Server.ViewerMode {
for i, taskSetRegistrer := range taskSetRegistrer {
err = taskSetRegistrer(inspectionServer)
if err != nil {
slog.Error(fmt.Sprintf("Failed to call initialize calls for taskSetRegistrer(#%d)\n%v", i, err))
}
}
}
// Channel to receive exit codes from concurrent goroutines
exitCh := make(chan int, 1)
// Start signal handler
go handleTerminateSignal(exitCh)
if !*parameters.Job.JobMode {
slog.Info("Starting Kubernetes History Inspector server...")
uploadFileStoreFolder := "/tmp"
if parameters.Common.UploadFileStoreFolder != nil {
uploadFileStoreFolder = *parameters.Common.UploadFileStoreFolder
}
upload.DefaultUploadFileStore = upload.NewUploadFileStore(upload.NewLocalUploadFileStoreProvider(uploadFileStoreFolder))
config := server.ServerConfig{
ViewerMode: *parameters.Server.ViewerMode,
StaticFolderPath: *parameters.Server.FrontendAssetFolder,
ResourceMonitor: &server.ResourceMonitorImpl{},
ServerBasePath: *parameters.Server.BasePath,
UploadFileStore: upload.DefaultUploadFileStore,
}
engine := server.CreateKHIServer(inspectionServer, &config)
if parameters.Auth.OAuthEnabled() {
err := accesstoken.DefaultOAuthTokenResolver.SetServer(engine)
if err != nil {
slog.Error("failed to register the web server to OAuth Token resolver")
return 1
}
}
go func() {
err = engine.Run(fmt.Sprintf("%s:%d", *parameters.Server.Host, *parameters.Server.Port))
if err != nil {
slog.Error(fmt.Sprintf("Failed to start server\n%s", err.Error()))
exitCh <- 1
} else {
slog.Error("Hitting the unreachable code. Server terminated unexpectedly")
exitCh <- 1
}
}()
displayStartMessage(*parameters.Server.Host, *parameters.Server.Port)
} else {
slog.Info("Starting Kubernetes History Inspector as job mode...")
go func() {
queryParametersInJson := *parameters.Job.InspectionValues
var values map[string]any
err := json.Unmarshal([]byte(queryParametersInJson), &values)
if err != nil {
slog.Error(fmt.Sprintf("Failed to parse an inspection value %s\n%s", queryParametersInJson, err.Error()))
exitCh <- 1
return
}
inspectionID, err := inspectionServer.CreateInspection(*parameters.Job.InspectionType)
if err != nil {
slog.Error(fmt.Sprintf("Failed to create an inspection with type %s\n%s", *parameters.Job.InspectionType, err.Error()))
exitCh <- 1
return
}
features := strings.Split(*parameters.Job.InspectionFeatures, ",")
t := inspectionServer.GetInspection(inspectionID)
// When the features env has `ALL`, it enables every features being available
if len(features) == 1 && strings.ToUpper(features[0]) == "ALL" {
availableFeatures, err := t.FeatureList()
if err != nil {
slog.Error(fmt.Sprintf("Failed to obtain current feature list\n%s", err.Error()))
exitCh <- 1
return
}
allFeatures := []string{}
for _, af := range availableFeatures {
allFeatures = append(allFeatures, af.Id)
}
features = allFeatures
}
err = t.SetFeatureList(features)
if err != nil {
slog.Error(fmt.Sprintf("Failed to set features %v\n%s", features, err.Error()))
exitCh <- 1
return
}
err = t.Run(context.Background(), &inspection_task.InspectionRequest{
Values: values,
})
if err != nil {
slog.Error(fmt.Sprintf("Failed to run inspection task \n%s", err.Error()))
exitCh <- 1
return
}
<-t.Wait()
result, err := t.Result()
if err != nil {
slog.Error(fmt.Sprintf("Failed to get inspection result \n%s", err.Error()))
exitCh <- 1
return
}
reader, err := result.ResultStore.GetReader()
if err != nil {
slog.Error(fmt.Sprintf("Failed to get inspection result reader \n%s", err.Error()))
exitCh <- 1
return
}
defer reader.Close()
file, err := os.OpenFile(*parameters.Job.ExportDestination, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
slog.Error(fmt.Sprintf("Failed to open the destination file \n%s", err.Error()))
exitCh <- 1
return
}
_, err = io.Copy(file, reader)
if err != nil {
slog.Error(fmt.Sprintf("Failed to flush to the destination file \n%s", err.Error()))
exitCh <- 1
return
}
}()
}
// Wait for exit code from any source
code := <-exitCh
return code
}