ingestor/runner/shutdown/shutdown.go (61 lines of code) (raw):
package runner
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/types"
"net/http"
"os"
"time"
"github.com/Azure/adx-mon/ingestor"
"github.com/Azure/adx-mon/pkg/logger"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
const (
namespace = "adx-mon"
SHUTDOWN_COMPLETED = "shutdown-completed"
SHUTDOWN_REQUESTED = "shutdown-requested"
shutdownTimeout = 5 * time.Minute
)
type ShutDownRunner struct {
k8sClient kubernetes.Interface
httpServer *http.Server
service ingestor.Interface
}
func NewShutDownRunner(cli kubernetes.Interface, http *http.Server, svc ingestor.Interface) *ShutDownRunner {
return &ShutDownRunner{
k8sClient: cli,
httpServer: http,
service: svc,
}
}
func (r *ShutDownRunner) Run(ctx context.Context) error {
//get ingestor pod in which this runner is running
pod, err := r.k8sClient.CoreV1().Pods(namespace).Get(ctx, os.Getenv("HOSTNAME"), metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get pod annotations: %v", err)
}
//check if shutdown-completed annotation is set
if _, ok := pod.Annotations[SHUTDOWN_COMPLETED]; ok {
logger.Infof("Shutdown already completed on the pod, skipping shutting down")
return nil
}
//shutdown the service
if _, ok := pod.Annotations[SHUTDOWN_REQUESTED]; ok {
logger.Infof("Shutting down the service")
if err := r.httpServer.Close(); err != nil {
return fmt.Errorf("failed to close http server: %v", err)
}
timeoutCtx, cancel := context.WithTimeout(ctx, shutdownTimeout)
defer cancel()
if err := r.service.Shutdown(timeoutCtx); err != nil {
return fmt.Errorf("failed to shutdown the service: %v", err)
}
logger.Infof("Service shutdown completed")
// Create a patch to set the shutdown-completed annotation
patch := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, SHUTDOWN_COMPLETED, time.Now().Format(time.RFC3339)))
if _, err := r.k8sClient.CoreV1().Pods(namespace).Patch(ctx, pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}); err != nil {
return fmt.Errorf("failed to patch shutdown-completed annotation: %v", err)
}
}
return nil
}
func (r *ShutDownRunner) Name() string {
return "shutdown"
}