cmd/ndb-pod-initializer/main.go (189 lines of code) (raw):

// Copyright (c) 2022, Oracle and/or its affiliates. // // Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ package main import ( "context" "errors" "fmt" "log" "net" "os" "strconv" "strings" "time" "github.com/mysql/ndb-operator/config/debug" "github.com/mysql/ndb-operator/pkg/constants" "github.com/mysql/ndb-operator/pkg/mgmapi" "github.com/mysql/ndb-operator/pkg/mysqlclient" "github.com/mysql/ndb-operator/pkg/resources/statefulset" ) // helper function to handle fatal errors func failOnError(err error, errMsgFmt string, errMsgArgs ...interface{}) { if err != nil { log.Fatalf(errMsgFmt, errMsgArgs...) } } func isAllowedDNSError(dnsError *net.DNSError) bool { if dnsError.IsNotFound || dnsError.IsTemporary { return true } allowedDNSErrors := []string{ "server misbehaving", } for _, allowedErr := range allowedDNSErrors { if strings.Contains(dnsError.Error(), allowedErr) { return true } } return false } // isDnsUpdated checks if the DNS can resolve the // current pod hostname to the right IP address. func isDnsUpdated(ctx context.Context, hostname, expectedIP string) bool { resolvedIPs, err := net.DefaultResolver.LookupIP(ctx, "ip4", hostname) if err != nil { var dnsError *net.DNSError if errors.As(err, &dnsError) && isAllowedDNSError(dnsError) { // DNS doesn't have the entry yet return false } else { // Unrecognised error failOnError(err, "Failed to resolve hostname %q : %s", hostname, err) } } if len(resolvedIPs) > 1 { // Hostname resolved to more than 1 IP address => some other // pod's hostname is still pointed to this IP address in DNS. return false } if resolvedIPs[0].String() != expectedIP { // Hostname resolved to wrong IP => DNS not updated yet return false } return true } // waitForPodDNSUpdate waits until the DNS can resolve the current hostname without any error. // // Note 1 : This is required as the K8s CoreDNS runs with a default cache time of 30s for // all responses. When the first Management Server comes up, it tries to resolve all the // hostnames mentioned in the config and fails as the pods are not up yet. All these // negative responses end up getting cached in the CoreDNS server and will be invalidated // only when the cache time expires. The issue is explained at // https://github.com/kubernetes/kubernetes/issues/92559. Although a fix that disables the // caching for negative responses is now available at // https://github.com/coredns/coredns/pull/5540, it will only be included in the latest // versions of K8s. // // Note 2 : Most K8s clusters run with multiple CoreDNS replicas. There is a good chance // that an address that resolves at one replica might not resolve at other due to the race // caused by this cache timeout issue. The DNS queries are usually sent to a CoreDNS // ClusterIP Service that in turn forwards the requests to the CoreDNS pods either in a // round-robin or a random fashion. To make sure that this method doesn't return before // the given hostname can be resolved correctly by all the CoreDNS replicas, it waits until // the hostname can be resolved successfully for a continuous period of time. This is a // temporary workaround and the best solution (TODO:) is to implement and use a minimal DNS // Server that can run with the operator and resolve the hostnames of MySQL Cluster nodes // without any delay when requested. The pods can then use it to resolve the other pod // hostnames and the CoreDNS server for everything else. func waitForPodDNSUpdate(ctx context.Context, podHostname string) { podNamespace := os.Getenv("NDB_POD_NAMESPACE") // Use partial fqdn so that the query gets sent to DNS. hostnameToResolve := fmt.Sprintf("%s.%s.%s", podHostname, podHostname[:len(podHostname)-2], podNamespace) podIP := os.Getenv("NDB_POD_IP") // Record the time the last failure occurred lastFailureTime := time.Now() log.Println("Waiting for Pod's hostname to be updated in DNS...") for { if isDnsUpdated(ctx, hostnameToResolve, podIP) { // Consider the hostname resolvable only when all the queries sent // in the last 5 seconds has succeeded. if time.Since(lastFailureTime) > 5*time.Second { log.Println("Done") break } } else { lastFailureTime = time.Now() } // DNS not updated yet - sleep and retry time.Sleep(100 * time.Millisecond) } } // getPodMySQLClusterNodeType returns the type of the MySQL Cluster node the pod is running. func getPodMySQLClusterNodeType(podHostname string) constants.NdbNodeType { // Hostname will be of form <ndbcluster name>-<node-type>-<sfset pod ordinal index> tokens := strings.Split(podHostname, "-") return tokens[len(tokens)-2] } // writeNodeIDToFile deduces the nodeId of the current MySQL Cluster // node, writes it to a file and then returns the nodeId. func writeNodeIDToFile(hostname, ndbConnectString string) (nodeId int, nodeIdPool []int, nodeType mgmapi.NodeTypeEnum) { // All nodeIds are sequentially assigned based on the ordinal indices // of the StatefulSet pods. So deduce the nodeId of the first MySQL // Cluster node of same nodeType (i.e) the node with StatefulSet ordinal // index 0, of same type. Then the nodeId of the current node will be // startNodeId + pod ordinal index. var startNodeIdOfSameNodeType int switch getPodMySQLClusterNodeType(hostname) { case constants.NdbNodeTypeMgmd: // Management nodes have nodeId 1 and 2. startNodeIdOfSameNodeType = 1 nodeType = mgmapi.NodeTypeMGM case constants.NdbNodeTypeNdbmtd: // Data nodes' nodeId continue sequentially // after the Management nodes' nodeIds. numOfManagementNode := len(strings.Split(ndbConnectString, ",")) - 1 startNodeIdOfSameNodeType = numOfManagementNode + 1 nodeType = mgmapi.NodeTypeNDB case constants.NdbNodeTypeMySQLD: nodeType = mgmapi.NodeTypeAPI } // Extract pod ordinal index from hostname. // Hostname will be of form <ndbcluster name>-<node-type>-<sfset pod ordinal index> tokens := strings.Split(hostname, "-") podOrdinalIndex, _ := strconv.ParseInt(tokens[len(tokens)-1], 10, 32) // Calculate nodeId var nodeIdText string if nodeType == mgmapi.NodeTypeAPI { // For MySQL Servers, if connection pool is enabled, successive // nodeIds are assigned to a single MySQL Server ndbConnectionPoolSize, _ := strconv.ParseInt(os.Getenv("NDB_CONNECTION_POOL_SIZE"), 10, 32) startNodeId := constants.NdbNodeTypeAPIStartNodeId + int(ndbConnectionPoolSize)*int(podOrdinalIndex) endNodeId := startNodeId + int(ndbConnectionPoolSize) for ; startNodeId < endNodeId; startNodeId++ { nodeIdPool = append(nodeIdPool, startNodeId) nodeIdText += fmt.Sprintf("%d,", startNodeId) } nodeIdText = nodeIdText[:len(nodeIdText)-1] } else { nodeId = startNodeIdOfSameNodeType + int(podOrdinalIndex) nodeIdText = fmt.Sprintf("%d", nodeId) } // Persist the nodeId into a file to be used by other scripts/commands f, err := os.OpenFile(statefulset.NodeIdFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) failOnError(err, "Failed to create file %q : %s", statefulset.NodeIdFilePath, err) _, err = f.WriteString(nodeIdText) failOnError(err, "Failed to write nodeId to file : %s", err) return nodeId, nodeIdPool, nodeType } // waitForNodeIdAvailability waits until the given nodeId can be // successfully allocated to the MySQL Cluster node that will be run in this pod. func waitForNodeIdAvailability(nodeId int, nodeType mgmapi.NodeTypeEnum, mgmClient mgmapi.MgmClient) { var err error for { if _, err = mgmClient.TryReserveNodeId(nodeId, nodeType); err == nil { // alloc nodeId succeeded => MySQL Cluster is ready to accept this connection break } // TryReserveNodeId failed if debug.Enabled { log.Printf("TryReserveNodeId failed : %s", err.Error()) } // sleep and retry time.Sleep(100 * time.Millisecond) } } func isSystemRestart(mgmClient mgmapi.MgmClient) bool { clusterStatus, err := mgmClient.GetStatus() failOnError(err, "Failed to get status from management server : %s", err) for _, nodeStatus := range clusterStatus { if nodeStatus.IsDataNode() && nodeStatus.IsConnected { // Another datanode is already connected to the system => Not a System Restart return false } } // No datanodes connected yet return true } func waitForDataNodeFailureHandlingToComplete(nodeId int, podHostname string) (success bool) { // Connect to the MySQL Server - use the StatefulSet's 0th pod hostnameTokens := strings.Split(podHostname, "-") hostnameTokens[len(hostnameTokens)-1] = "0" hostnameTokens[len(hostnameTokens)-2] = constants.NdbNodeTypeMySQLD mysqldHost := fmt.Sprintf("%s.%s", strings.Join(hostnameTokens, "-"), strings.Join(hostnameTokens[:len(hostnameTokens)-1], "-")) operatorPassword := os.Getenv("NDB_OPERATOR_PASSWORD") db, err := mysqlclient.Connect(mysqldHost, mysqlclient.DbNdbInfo, operatorPassword) if err != nil { // MySQL Server unavailable return false } // Check if the existing data nodes have completed handling // the previous failure of this data node by querying the // ndbinfo.restart_info table. When all the STARTED data nodes // have completed handling the node failure, the // node_restart_status of the current node will be set to // "Node failure handling completed". var nodeRestartStatus string for { query := fmt.Sprintf("select node_restart_status from restart_info where node_id='%d'", nodeId) err = db.QueryRow(query).Scan(&nodeRestartStatus) if err != nil { // NdbInfo database unavailable log.Printf("Query on ndbinfo.restart_info table failed : %s", err) return false } if nodeRestartStatus == "Node failure handling complete" { // Failure handling completed return true } // Sleep and retry time.Sleep(500 * time.Millisecond) } } func main() { ctx := context.Background() podHostname := os.Getenv("HOSTNAME") // Wait for the hostname to be updated in the DNS waitForPodDNSUpdate(ctx, podHostname) // Persist the nodeId into a file for the other scripts/commands to use connectstring := os.Getenv("NDB_CONNECTSTRING") nodeId, nodeIdPool, nodeType := writeNodeIDToFile(podHostname, connectstring) if nodeType == mgmapi.NodeTypeMGM { log.Println("Pod initializer succeeded.") return } // Connect to the Management Server var err error var mgmClient mgmapi.MgmClient mgmClient, err = mgmapi.NewMgmClient(connectstring) failOnError(err, "Failed to connect to management server : %s", err) defer mgmClient.Disconnect() if nodeType == mgmapi.NodeTypeNDB && !isSystemRestart(mgmClient) { // Wait for the existing data nodes to finish // handling previous data node failure log.Println("Waiting for other data nodes to complete Node failure handling...") if !waitForDataNodeFailureHandlingToComplete(nodeId, podHostname) { // The method failed as either server or the ndbinfo database is not available. // Fallback to waitForNodeIdAvailability logic. This is used only as a fallback // as the nodeId is freed early in the node failure handling process and due // to that waitForNodeIdAvailability is less reliable than checking ndbinfo // database. log.Println("Falling back to checking if nodeId is available..") waitForNodeIdAvailability(nodeId, nodeType, mgmClient) } log.Println("Done") } if nodeType == mgmapi.NodeTypeAPI { // Wait for all the nodeIds to become available for MySQL nodes for _, id := range nodeIdPool { waitForNodeIdAvailability(id, nodeType, mgmClient) } } }