Stable-Diffusion-UI-Agones/agones-sidecar/main.go (462 lines of code) (raw):
// Copyright 2020 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package main is a very simple server with UDP (default), TCP, or both
package main
import (
"bufio"
"context"
"encoding/json"
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
coresdk "agones.dev/agones/pkg/sdk"
"agones.dev/agones/pkg/util/signals"
sdk "agones.dev/agones/sdks/go"
)
// main starts a UDP or TCP server
func main() {
sigCtx, _ := signals.NewSigKillContext()
port := flag.String("port", "7654", "The port to listen to traffic on")
passthrough := flag.Bool("passthrough", false, "Get listening port from the SDK, rather than use the 'port' value")
readyOnStart := flag.Bool("ready", true, "Mark this GameServer as Ready on startup")
shutdownDelayMin := flag.Int("automaticShutdownDelayMin", 0, "[Deprecated] If greater than zero, automatically shut down the server this many minutes after the server becomes allocated (please use automaticShutdownDelaySec instead)")
shutdownDelaySec := flag.Int("automaticShutdownDelaySec", 0, "If greater than zero, automatically shut down the server this many seconds after the server becomes allocated (cannot be used if automaticShutdownDelayMin is set)")
readyDelaySec := flag.Int("readyDelaySec", 0, "If greater than zero, wait this many seconds each time before marking the game server as ready")
readyIterations := flag.Int("readyIterations", 0, "If greater than zero, return to a ready state this number of times before shutting down")
udp := flag.Bool("udp", true, "Server will listen on UDP")
tcp := flag.Bool("tcp", false, "Server will listen on TCP")
flag.Parse()
if ep := os.Getenv("PORT"); ep != "" {
port = &ep
}
if epass := os.Getenv("PASSTHROUGH"); epass != "" {
p := strings.ToUpper(epass) == "TRUE"
passthrough = &p
}
if eready := os.Getenv("READY"); eready != "" {
r := strings.ToUpper(eready) == "TRUE"
readyOnStart = &r
}
if eudp := os.Getenv("UDP"); eudp != "" {
u := strings.ToUpper(eudp) == "TRUE"
udp = &u
}
if etcp := os.Getenv("TCP"); etcp != "" {
t := strings.ToUpper(etcp) == "TRUE"
tcp = &t
}
// Check for incompatible flags.
if *shutdownDelayMin > 0 && *shutdownDelaySec > 0 {
log.Fatalf("Cannot set both --automaticShutdownDelayMin and --automaticShutdownDelaySec")
}
if *readyIterations > 0 && *shutdownDelayMin <= 0 && *shutdownDelaySec <= 0 {
log.Fatalf("Must set a shutdown delay if using ready iterations")
}
log.Print("Creating SDK instance")
s, err := sdk.NewSDK()
if err != nil {
log.Fatalf("Could not connect to sdk: %v", err)
}
log.Print("Starting Health Ping")
ctx, cancel := context.WithCancel(context.Background())
go doHealth(s, ctx)
if *passthrough {
var gs *coresdk.GameServer
gs, err = s.GameServer()
if err != nil {
log.Fatalf("Could not get gameserver port details: %s", err)
}
p := strconv.FormatInt(int64(gs.Status.Ports[0].Port), 10)
port = &p
}
if *tcp {
go tcpListener(port, s, cancel)
}
if *udp {
go udpListener(port, s, cancel)
}
if *shutdownDelaySec > 0 {
shutdownAfterNAllocations(s, *readyIterations, *shutdownDelaySec)
} else if *shutdownDelayMin > 0 {
shutdownAfterNAllocations(s, *readyIterations, *shutdownDelayMin*60)
}
if *readyOnStart {
if *readyDelaySec > 0 {
log.Printf("Waiting %d seconds before moving to ready", *readyDelaySec)
time.Sleep(time.Duration(*readyDelaySec) * time.Second)
}
for {
r, err := http.Get("http://127.0.0.1:7860")
if err != nil {
log.Print("SD-WebUI not ready yet")
time.Sleep(2 * time.Second)
continue
}
if r.StatusCode != 200 {
log.Print("SD-WebUI not ready yet")
time.Sleep(2 * time.Second)
continue
}
break
}
log.Print("Marking this server as ready")
ready(s)
}
<-sigCtx.Done()
os.Exit(0)
}
// shutdownAfterNAllocations creates a callback to automatically shut down
// the server a specified number of seconds after the server becomes
// allocated the Nth time.
//
// The algorithm is:
//
// 1. Move the game server back to ready N times after it is allocated
// 2. Shutdown the game server after the Nth time is becomes allocated
//
// This follows the integration pattern documented on the website at
// https://agones.dev/site/docs/integration-patterns/reusing-gameservers/
func shutdownAfterNAllocations(s *sdk.SDK, readyIterations, shutdownDelaySec int) {
gs, err := s.GameServer()
if err != nil {
log.Fatalf("Could not get game server: %v", err)
}
log.Printf("Initial game Server state = %s", gs.Status.State)
m := sync.Mutex{} // protects the following two variables
lastAllocated := gs.ObjectMeta.Annotations["agones.dev/last-allocated"]
remainingIterations := readyIterations
if err := s.WatchGameServer(func(gs *coresdk.GameServer) {
m.Lock()
defer m.Unlock()
la := gs.ObjectMeta.Annotations["agones.dev/last-allocated"]
log.Printf("Watch Game Server callback fired. State = %s, Last Allocated = %q", gs.Status.State, la)
if lastAllocated != la {
log.Println("Game Server Allocated")
lastAllocated = la
remainingIterations--
// Run asynchronously
go func(iterations int) {
time.Sleep(time.Duration(shutdownDelaySec) * time.Second)
if iterations > 0 {
log.Println("Moving Game Server back to Ready")
readyErr := s.Ready()
if readyErr != nil {
log.Fatalf("Could not set game server to ready: %v", readyErr)
}
log.Println("Game Server is Ready")
return
}
log.Println("Moving Game Server to Shutdown")
if shutdownErr := s.Shutdown(); shutdownErr != nil {
log.Fatalf("Could not shutdown game server: %v", shutdownErr)
}
// The process will exit when Agones removes the pod and the
// container receives the SIGTERM signal
return
}(remainingIterations)
}
}); err != nil {
log.Fatalf("Could not watch Game Server events, %v", err)
}
}
func handleResponse(txt string, s *sdk.SDK, cancel context.CancelFunc) (response string, addACK bool, responseError error) {
parts := strings.Split(strings.TrimSpace(txt), " ")
response = txt
addACK = true
responseError = nil
switch parts[0] {
// shuts down the gameserver
case "EXIT":
// handle elsewhere, as we respond before exiting
return
// turns off the health pings
case "UNHEALTHY":
cancel()
case "GAMESERVER":
response = gameServerName(s)
addACK = false
case "READY":
ready(s)
case "ALLOCATE":
allocate(s)
case "RESERVE":
if len(parts) != 2 {
response = "Invalid RESERVE, should have 1 argument"
responseError = fmt.Errorf("Invalid RESERVE, should have 1 argument")
}
if dur, err := time.ParseDuration(parts[1]); err != nil {
response = fmt.Sprintf("%s\n", err)
responseError = err
} else {
reserve(s, dur)
}
case "WATCH":
watchGameServerEvents(s)
case "LABEL":
switch len(parts) {
case 1:
// legacy format
setLabel(s, "timestamp", strconv.FormatInt(time.Now().Unix(), 10))
case 3:
setLabel(s, parts[1], parts[2])
default:
response = "Invalid LABEL command, must use zero or 2 arguments"
responseError = fmt.Errorf("Invalid LABEL command, must use zero or 2 arguments")
}
case "CRASH":
log.Print("Crashing.")
os.Exit(1)
return "", false, nil
case "ANNOTATION":
switch len(parts) {
case 1:
// legacy format
setAnnotation(s, "timestamp", time.Now().UTC().String())
case 3:
setAnnotation(s, parts[1], parts[2])
default:
response = "Invalid ANNOTATION command, must use zero or 2 arguments"
responseError = fmt.Errorf("Invalid ANNOTATION command, must use zero or 2 arguments")
}
case "PLAYER_CAPACITY":
switch len(parts) {
case 1:
response = getPlayerCapacity(s)
addACK = false
case 2:
if cap, err := strconv.Atoi(parts[1]); err != nil {
response = fmt.Sprintf("%s", err)
responseError = err
} else {
setPlayerCapacity(s, int64(cap))
}
default:
response = "Invalid PLAYER_CAPACITY, should have 0 or 1 arguments"
responseError = fmt.Errorf("Invalid PLAYER_CAPACITY, should have 0 or 1 arguments")
}
case "PLAYER_CONNECT":
if len(parts) < 2 {
response = "Invalid PLAYER_CONNECT, should have 1 arguments"
responseError = fmt.Errorf("Invalid PLAYER_CONNECT, should have 1 arguments")
return
}
playerConnect(s, parts[1])
case "PLAYER_DISCONNECT":
if len(parts) < 2 {
response = "Invalid PLAYER_DISCONNECT, should have 1 arguments"
responseError = fmt.Errorf("Invalid PLAYER_DISCONNECT, should have 1 arguments")
return
}
playerDisconnect(s, parts[1])
case "PLAYER_CONNECTED":
if len(parts) < 2 {
response = "Invalid PLAYER_CONNECTED, should have 1 arguments"
responseError = fmt.Errorf("Invalid PLAYER_CONNECTED, should have 1 arguments")
return
}
response = playerIsConnected(s, parts[1])
addACK = false
case "GET_PLAYERS":
response = getConnectedPlayers(s)
addACK = false
case "PLAYER_COUNT":
response = getPlayerCount(s)
addACK = false
}
return
}
func udpListener(port *string, s *sdk.SDK, cancel context.CancelFunc) {
log.Printf("Starting UDP server, listening on port %s", *port)
conn, err := net.ListenPacket("udp", ":"+*port)
if err != nil {
log.Fatalf("Could not start UDP server: %v", err)
}
defer conn.Close() // nolint: errcheck
udpReadWriteLoop(conn, cancel, s)
}
func udpReadWriteLoop(conn net.PacketConn, cancel context.CancelFunc, s *sdk.SDK) {
b := make([]byte, 1024)
for {
sender, txt := readPacket(conn, b)
log.Printf("Received UDP: %v", txt)
response, addACK, err := handleResponse(txt, s, cancel)
if err != nil {
response = "ERROR: " + response + "\n"
} else if addACK {
response = "ACK: " + response + "\n"
}
udpRespond(conn, sender, response)
if txt == "EXIT" {
exit(s)
}
}
}
// respond responds to a given sender.
func udpRespond(conn net.PacketConn, sender net.Addr, txt string) {
if _, err := conn.WriteTo([]byte(txt), sender); err != nil {
log.Fatalf("Could not write to udp stream: %v", err)
}
}
func tcpListener(port *string, s *sdk.SDK, cancel context.CancelFunc) {
log.Printf("Starting TCP server, listening on port %s", *port)
ln, err := net.Listen("tcp", ":"+*port)
if err != nil {
log.Fatalf("Could not start TCP server: %v", err)
}
defer ln.Close() // nolint: errcheck
for {
conn, err := ln.Accept()
if err != nil {
log.Printf("Unable to accept incoming TCP connection: %v", err)
}
go tcpHandleConnection(conn, s, cancel)
}
}
// handleConnection services a single tcp connection to the server
func tcpHandleConnection(conn net.Conn, s *sdk.SDK, cancel context.CancelFunc) {
log.Printf("TCP Client %s connected", conn.RemoteAddr().String())
scanner := bufio.NewScanner(conn)
for scanner.Scan() {
tcpHandleCommand(conn, scanner.Text(), s, cancel)
}
log.Printf("TCP Client %s disconnected", conn.RemoteAddr().String())
}
func tcpHandleCommand(conn net.Conn, txt string, s *sdk.SDK, cancel context.CancelFunc) {
log.Printf("TCP txt: %v", txt)
response, addACK, err := handleResponse(txt, s, cancel)
if err != nil {
response = "ERROR: " + response + "\n"
} else if addACK {
response = "ACK TCP: " + response + "\n"
}
tcpRespond(conn, response)
if response == "EXIT" {
exit(s)
}
}
// respond responds to a given sender.
func tcpRespond(conn net.Conn, txt string) {
log.Printf("Responding to TCP with %q", txt)
if _, err := conn.Write([]byte(txt + "\n")); err != nil {
log.Fatalf("Could not write to TCP stream: %v", err)
}
}
// ready attempts to mark this gameserver as ready
func ready(s *sdk.SDK) {
err := s.Ready()
if err != nil {
log.Fatalf("Could not send ready message")
}
}
// allocate attempts to allocate this gameserver
func allocate(s *sdk.SDK) {
err := s.Allocate()
if err != nil {
log.Fatalf("could not allocate gameserver: %v", err)
}
}
// reserve for 10 seconds
func reserve(s *sdk.SDK, duration time.Duration) {
if err := s.Reserve(duration); err != nil {
log.Fatalf("could not reserve gameserver: %v", err)
}
}
// readPacket reads a string from the connection
func readPacket(conn net.PacketConn, b []byte) (net.Addr, string) {
n, sender, err := conn.ReadFrom(b)
if err != nil {
log.Fatalf("Could not read from udp stream: %v", err)
}
txt := strings.TrimSpace(string(b[:n]))
log.Printf("Received packet from %v: %v", sender.String(), txt)
return sender, txt
}
// exit shutdowns the server
func exit(s *sdk.SDK) {
log.Printf("Received EXIT command. Exiting.")
// This tells Agones to shutdown this Game Server
shutdownErr := s.Shutdown()
if shutdownErr != nil {
log.Printf("Could not shutdown")
}
// The process will exit when Agones removes the pod and the
// container receives the SIGTERM signal
}
// gameServerName returns the GameServer name
func gameServerName(s *sdk.SDK) string {
var gs *coresdk.GameServer
gs, err := s.GameServer()
if err != nil {
log.Fatalf("Could not retrieve GameServer: %v", err)
}
var j []byte
j, err = json.Marshal(gs)
if err != nil {
log.Fatalf("error mashalling GameServer to JSON: %v", err)
}
log.Printf("GameServer: %s \n", string(j))
return "NAME: " + gs.ObjectMeta.Name + "\n"
}
// watchGameServerEvents creates a callback to log when
// gameserver events occur
func watchGameServerEvents(s *sdk.SDK) {
err := s.WatchGameServer(func(gs *coresdk.GameServer) {
j, err := json.Marshal(gs)
if err != nil {
log.Fatalf("error mashalling GameServer to JSON: %v", err)
}
log.Printf("GameServer Event: %s \n", string(j))
})
if err != nil {
log.Fatalf("Could not watch Game Server events, %v", err)
}
}
// setAnnotation sets a given annotation
func setAnnotation(s *sdk.SDK, key, value string) {
log.Printf("Setting annotation %v=%v", key, value)
err := s.SetAnnotation(key, value)
if err != nil {
log.Fatalf("could not set annotation: %v", err)
}
}
// setLabel sets a given label
func setLabel(s *sdk.SDK, key, value string) {
log.Printf("Setting label %v=%v", key, value)
// label values can only be alpha, - and .
err := s.SetLabel(key, value)
if err != nil {
log.Fatalf("could not set label: %v", err)
}
}
// setPlayerCapacity sets the player capacity to the given value
func setPlayerCapacity(s *sdk.SDK, capacity int64) {
log.Printf("Setting Player Capacity to %d", capacity)
if err := s.Alpha().SetPlayerCapacity(capacity); err != nil {
log.Fatalf("could not set capacity: %v", err)
}
}
// getPlayerCapacity returns the current player capacity as a string
func getPlayerCapacity(s *sdk.SDK) string {
log.Print("Getting Player Capacity")
capacity, err := s.Alpha().GetPlayerCapacity()
if err != nil {
log.Fatalf("could not get capacity: %v", err)
}
return strconv.FormatInt(capacity, 10) + "\n"
}
// playerConnect connects a given player
func playerConnect(s *sdk.SDK, id string) {
log.Printf("Connecting Player: %s", id)
if _, err := s.Alpha().PlayerConnect(id); err != nil {
log.Fatalf("could not connect player: %v", err)
}
}
// playerDisconnect disconnects a given player
func playerDisconnect(s *sdk.SDK, id string) {
log.Printf("Disconnecting Player: %s", id)
if _, err := s.Alpha().PlayerDisconnect(id); err != nil {
log.Fatalf("could not disconnect player: %v", err)
}
}
// playerIsConnected returns a bool as a string if a player is connected
func playerIsConnected(s *sdk.SDK, id string) string {
log.Printf("Checking if player %s is connected", id)
connected, err := s.Alpha().IsPlayerConnected(id)
if err != nil {
log.Fatalf("could not retrieve if player is connected: %v", err)
}
return strconv.FormatBool(connected) + "\n"
}
// getConnectedPlayers returns a comma delimeted list of connected players
func getConnectedPlayers(s *sdk.SDK) string {
log.Print("Retrieving connected player list")
list, err := s.Alpha().GetConnectedPlayers()
if err != nil {
log.Fatalf("could not retrieve connected players: %s", err)
}
return strings.Join(list, ",") + "\n"
}
// getPlayerCount returns the count of connected players as a string
func getPlayerCount(s *sdk.SDK) string {
log.Print("Retrieving connected player count")
count, err := s.Alpha().GetPlayerCount()
if err != nil {
log.Fatalf("could not retrieve player count: %s", err)
}
return strconv.FormatInt(count, 10) + "\n"
}
// doHealth sends the regular Health Pings
func doHealth(sdk *sdk.SDK, ctx context.Context) {
tick := time.Tick(2 * time.Second)
for {
log.Printf("Health Ping")
err := sdk.Health()
if err != nil {
log.Fatalf("Could not send health ping, %v", err)
}
select {
case <-ctx.Done():
log.Print("Stopped health pings")
return
case <-tick:
}
}
}