google_guest_agent/wsfc.go (197 lines of code) (raw):

// Copyright 2017 Google LLC // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // https://www.apache.org/licenses/LICENSE-2.0 // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "context" "net" "strings" "sync" "time" "github.com/GoogleCloudPlatform/guest-agent/google_guest_agent/cfg" "github.com/GoogleCloudPlatform/guest-logging-go/logger" ) const wsfcDefaultAgentPort = "59998" type agentState int // Enum for agentState const ( running agentState = iota stopped ) var ( once sync.Once agentInstance *wsfcAgent ) type wsfcManager struct { agentNewState agentState agentNewPort string agent healthAgent } // Create new wsfcManager based on metadata agent request state will be set to // running if one of the following is true: // - EnableWSFC is set // - WSFCAddresses is set (As an advanced setting, it will always override EnableWSFC flag) func newWsfcManager() *wsfcManager { newState := stopped config := cfg.Get() if func() bool { if config.WSFC != nil && config.WSFC.Enable && config.WSFC.Addresses != "" { return config.WSFC.Enable } if newMetadata.Instance.Attributes.EnableWSFC != nil { return *newMetadata.Instance.Attributes.EnableWSFC } if newMetadata.Instance.Attributes.WSFCAddresses != "" { return true } if newMetadata.Project.Attributes.EnableWSFC != nil { return *newMetadata.Project.Attributes.EnableWSFC } if newMetadata.Project.Attributes.WSFCAddresses != "" { return true } return false }() { newState = running } newPort := wsfcDefaultAgentPort if config.WSFC != nil && config.WSFC.Port != "" { newPort = config.WSFC.Port } else if newMetadata.Instance.Attributes.WSFCAgentPort != "" { newPort = newMetadata.Instance.Attributes.WSFCAgentPort } else if newMetadata.Project.Attributes.WSFCAgentPort != "" { newPort = newMetadata.Instance.Attributes.WSFCAgentPort } return &wsfcManager{agentNewState: newState, agentNewPort: newPort, agent: getWsfcAgentInstance()} } // Implement manager.diff() func (m *wsfcManager) Diff(ctx context.Context) (bool, error) { return m.agentNewState != m.agent.getState() || m.agentNewPort != m.agent.getPort(), nil } // Implement manager.disabled(). // wsfc manager is always enabled. The manager is just a broker which manages the state of wsfcAgent. User // can disable the wsfc feature by setting the metadata. If the manager is disabled, the agent will stop. func (m *wsfcManager) Disabled(ctx context.Context) (bool, error) { return false, nil } func (m *wsfcManager) Timeout(ctx context.Context) (bool, error) { return false, nil } // Diff will always be called before set. So in set, only two cases are possible: // - state changed: start or stop the wsfc agent accordingly // - port changed: restart the agent if it is running func (m *wsfcManager) Set(ctx context.Context) error { m.agent.setPort(m.agentNewPort) // if state changes if m.agentNewState != m.agent.getState() { if m.agentNewState == running { return m.agent.run() } return m.agent.stop() } // If port changed if m.agent.getState() == running { if err := m.agent.stop(); err != nil { return err } return m.agent.run() } return nil } // interface for agent answering health check ping type healthAgent interface { getState() agentState getPort() string setPort(string) run() error stop() error } // Windows failover cluster agent, implements healthAgent interface type wsfcAgent struct { port string waitGroup *sync.WaitGroup listener *net.TCPListener } // Start agent and taking tcp request func (a *wsfcAgent) run() error { if a.getState() == running { logger.Infof("wsfc agent is already running") return nil } logger.Infof("Starting wsfc agent...") listenerAddr, err := net.ResolveTCPAddr("tcp", ":"+a.port) if err != nil { return err } listener, err := net.ListenTCP("tcp", listenerAddr) if err != nil { return err } // goroutine for handling request go func() { for { conn, err := listener.Accept() if err != nil { // if err is not due to listener closed, return if opErr, ok := err.(*net.OpError); ok && strings.Contains(opErr.Error(), "closed") { logger.Infof("wsfc agent - tcp listener closed.") return } logger.Errorf("wsfc agent - error on accepting request: %s", err) continue } a.waitGroup.Add(1) go a.handleHealthCheckRequest(conn) } }() logger.Infof("wsfc agent started. Listening on port: %s", a.port) a.listener = listener return nil } // Handle health check request. // The request payload is WSFC ip address. // Sendback 1 if ipaddress is found locally and 0 otherwise. func (a *wsfcAgent) handleHealthCheckRequest(conn net.Conn) { defer closer(conn) defer a.waitGroup.Done() conn.SetDeadline(time.Now().Add(time.Second)) buf := make([]byte, 1024) // Read the incoming connection into the buffer. reqLen, err := conn.Read(buf) if err != nil { logger.Errorf("wsfc - error on processing tcp request for network heartbeat health check: %s", err) return } wsfcIP := strings.TrimSpace(string(buf[:reqLen])) reply, err := checkIPExist(wsfcIP) if err != nil { logger.Errorf("wsfc - error on checking local ip: %s", err) } conn.Write([]byte(reply)) } // Stop agent. Will wait for all existing request to be completed. func (a *wsfcAgent) stop() error { if a.getState() == stopped { logger.Infof("wsfc agent already stopped.") return nil } logger.Infof("Stopping wsfc agent...") // close listener first to avoid taking additional request err := a.listener.Close() // wait for exiting request to finish a.waitGroup.Wait() a.listener = nil logger.Infof("wsfc agent stopped.") return err } // Get the current state of the agent. If there is a valid listener, // return state running and if listener is nil, return stopped func (a *wsfcAgent) getState() agentState { if a.listener != nil { return running } return stopped } func (a *wsfcAgent) getPort() string { return a.port } func (a *wsfcAgent) setPort(newPort string) { if newPort != a.port { logger.Infof("update wsfc agent from port %v to %v", a.port, newPort) a.port = newPort } } // Create wsfc agent only once func getWsfcAgentInstance() *wsfcAgent { once.Do(func() { agentInstance = &wsfcAgent{ port: wsfcDefaultAgentPort, waitGroup: &sync.WaitGroup{}, listener: nil, } }) return agentInstance } // help func to check whether the ip exists on local host. func checkIPExist(ip string) (string, error) { addrs, err := net.InterfaceAddrs() if err != nil { return "0", err } for _, address := range addrs { if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { ipString := ipnet.IP.To4().String() if ip == ipString { return "1", nil } } } return "0", nil }