cluster/cluster.go (354 lines of code) (raw):

package cluster /* * This file contains structs and functions related to interacting * with files and directories, both locally and remotely over SSH. */ import ( "bytes" "fmt" "os/exec" "strings" "github.com/cloudberrydb/gp-common-go-libs/dbconn" "github.com/cloudberrydb/gp-common-go-libs/gplog" "github.com/cloudberrydb/gp-common-go-libs/operating" "github.com/pkg/errors" ) type Executor interface { ExecuteLocalCommand(commandStr string) (string, error) ExecuteClusterCommand(scope Scope, commandList []ShellCommand) *RemoteOutput } // This type only exists to allow us to mock Execute[...]Command functions for testing type GPDBExecutor struct{} /* * A Cluster object stores information about the cluster in three ways: * - Segments is basically equivalent to gp_segment_configuration, a plain * list of segment information, and is ordered by content id. * - ByContent is a map of content id to the single corresponding segment. * - ByHost is a map of hostname to all of the segments on that host. * The maps are only stored for efficient lookup; Segments is the "source of * truth" for the cluster. The maps actually hold pointers to the SegConfigs * in Segments, so modifying Segments will modify the maps as well. */ type Cluster struct { ContentIDs []int Hostnames []string Segments []SegConfig ByContent map[int][]*SegConfig ByHost map[string][]*SegConfig Executor } type SegConfig struct { DbID int ContentID int Role string Port int Hostname string DataDir string } /* * A "scope" is a value composed of one or more of the below constants that is * passed into ShellCommands, RemoteOutputs, and related structs and functions * to define the scope of the command execution. The meaning of each value is * as follows: * * ON_SEGMENTS: Execute one command per segment. * ON_HOSTS: Execute one command per host. * * INCLUDE_COORDINATOR: Include the coordinator host or segment in the command list. * EXCLUDE_COORDINATOR: Exclude the coordinator host or segment from the command list. * * ON_REMOTE: Execute each command on the specified remote segment/host. * ON_LOCAL: Execute all commands on the coordinator host. * * INCLUDE_MIRRORS: Include mirror segments and hosts in the command list. * EXCLUDE_MIRRORS: Exclude mirror segments and hosts from the command list. * * A scope is composed of one or more of these values bitwise-OR'd together to * obtain a final scope, which has the following bitmask: * * /------- INCLUDE_MIRRORS (1) or EXCLUDE_MIRRORS (0) * |/------ INCLUDE_COORDINATOR (1) or EXCLUDE_COORDINATOR (0) * ||/----- ON_LOCAL (1) or ON_REMOTE (0) * |||/---- ON_HOSTS (1) or ON_SEGMENTS (0) * |||| * vvvv * 0000 * * For instance, to execute a command on all hosts including the coordinator host, * you would pass a function the scope ON_HOSTS | INCLUDE_COORDINATOR. * * The default scope is 0000, to execute a command on all primary segments, * equivalent to ON_SEGMENTS | ON_REMOTE | EXCLUDE_COORDINATOR | EXCLUDE_MIRRORS, * though by convention only ON_SEGMENTS need be passed to a function. * * Technically, the four zero-valued constants are redundant, but are provided * so that function callers can specify whatever scope they feel is most clear * (e.g. using INCLUDE_COORDINATOR vs. EXCLUDE_COORDINATOR as the basic scopes instead of * ON_SEGMENTS vs. ON_HOSTS if every ExecuteClusterCommand call is per-segment * and the utility includes the coordinator in commands a good portion of the time.) * * In version 1.0.10, support for the COORDINATOR scope was added, as GPDB 7 uses * "coordinator" in place of "master". The MASTER scopes are left in place (and * identical to the COORDINATOR scopes) for backwards compatibility, but may be * deprecated in future. */ type Scope uint8 const ( ON_SEGMENTS Scope = 0 ON_HOSTS Scope = 1 EXCLUDE_COORDINATOR Scope = 0 INCLUDE_COORDINATOR Scope = 1 << 1 EXCLUDE_MASTER Scope = 0 INCLUDE_MASTER Scope = 1 << 1 ON_REMOTE Scope = 0 ON_LOCAL Scope = 1 << 2 EXCLUDE_MIRRORS Scope = 0 INCLUDE_MIRRORS Scope = 1 << 3 ) func scopeIsSegments(scope Scope) bool { return scope&ON_HOSTS == ON_SEGMENTS } func scopeIsHosts(scope Scope) bool { return scope&ON_HOSTS == ON_HOSTS } func scopeExcludesCoordinator(scope Scope) bool { return scope&INCLUDE_COORDINATOR == EXCLUDE_COORDINATOR } func scopeIncludesCoordinator(scope Scope) bool { return scope&INCLUDE_COORDINATOR == INCLUDE_COORDINATOR } func scopeIsRemote(scope Scope) bool { return scope&ON_LOCAL == ON_REMOTE } func scopeIsLocal(scope Scope) bool { return scope&ON_LOCAL == ON_LOCAL } func scopeExcludesMirrors(scope Scope) bool { return scope&INCLUDE_MIRRORS == EXCLUDE_MIRRORS } func scopeIncludesMirrors(scope Scope) bool { return scope&INCLUDE_MIRRORS == INCLUDE_MIRRORS } /* * A ShellCommand stores a command to be executed (in both executable and * display form), as well as the results of the command execution and the * necessary information to determine how the command will be or was executed. * * It is assumed that before a caller references Content or Host for a given * command, they will check Scope to ensure that that field is meaningful for * that command. GenerateCommandList sets Host to "" for per-segment commands * and Content to -2 for per-host commands, just to be safe. */ type ShellCommand struct { Scope Scope Content int Host string Command *exec.Cmd CommandString string Stdout string Stderr string Error error Completed bool } func NewShellCommand(scope Scope, content int, host string, command []string) ShellCommand { return ShellCommand{ Scope: scope, Content: content, Host: host, Command: exec.Command(command[0], command[1:]...), CommandString: strings.Join(command, " "), } } /* * A RemoteOutput is used to make it easier to identify the success or failure * of a cluster command and to display the results to the user. */ type RemoteOutput struct { Scope Scope NumErrors int Commands []ShellCommand FailedCommands []*ShellCommand } func NewRemoteOutput(scope Scope, numErrors int, commands []ShellCommand) *RemoteOutput { failedCommands := make([]*ShellCommand, numErrors) index := 0 for i := range commands { if commands[i].Error != nil { failedCommands[index] = &commands[i] index++ } } return &RemoteOutput{ Scope: scope, NumErrors: numErrors, Commands: commands, FailedCommands: failedCommands, } } /* * Base cluster functions */ func NewCluster(segConfigs []SegConfig) *Cluster { cluster := Cluster{} cluster.Segments = segConfigs cluster.ByContent = make(map[int][]*SegConfig, 0) cluster.ByHost = make(map[string][]*SegConfig, 0) cluster.Executor = &GPDBExecutor{} for i := range cluster.Segments { segment := &cluster.Segments[i] cluster.ContentIDs = append(cluster.ContentIDs, segment.ContentID) cluster.ByContent[segment.ContentID] = append(cluster.ByContent[segment.ContentID], segment) segmentList := cluster.ByContent[segment.ContentID] if len(segmentList) == 2 && segmentList[0].Role == "m" { /* * GetSegmentConfiguration always returns primaries before mirrors, * but we can't guarantee the []SegConfig passed in was created by * GetSegmentConfiguration, so if the mirror is first, swap them. */ segmentList[0], segmentList[1] = segmentList[1], segmentList[0] } cluster.ByHost[segment.Hostname] = append(cluster.ByHost[segment.Hostname], segment) if len(cluster.ByHost[segment.Hostname]) == 1 { // Only add each hostname once cluster.Hostnames = append(cluster.Hostnames, segment.Hostname) } } return &cluster } /* * Because cluster commands can be executed either per-segment or per-host, the * "generator" argument to this function can accept one of two types: * - func(int) []string, which takes a content id, for per-segment commands * - func(string) []string, which takes a hostname, for per-host commands * The function uses a type switch to identify the right one, and panics if * an invalid function type is passed in via programmer error. * This method makes it easier for the user to pass in whichever function fits * the kind of command they're generating, as opposed to having to pass in both * content and hostname regardless of scope or using some sort of helper struct. */ func (cluster *Cluster) GenerateCommandList(scope Scope, generator interface{}) []ShellCommand { commands := []ShellCommand{} switch generateCommand := generator.(type) { case func(content int) []string: for _, content := range cluster.ContentIDs { if content == -1 && scopeExcludesCoordinator(scope) { continue } commands = append(commands, NewShellCommand(scope, content, "", generateCommand(content))) } case func(host string) []string: for _, host := range cluster.Hostnames { hostHasOneContent := len(cluster.GetContentsForHost(host)) == 1 if host == cluster.GetHostForContent(-1, "p") && scopeExcludesCoordinator(scope) && hostHasOneContent { // Only exclude the coordinator host if there are no local segments continue } if host == cluster.GetHostForContent(-1, "m") && scopeExcludesMirrors(scope) && hostHasOneContent { // Only exclude the standby coordinator host if there are no segments there continue } commands = append(commands, NewShellCommand(scope, -2, host, generateCommand(host))) } default: gplog.Fatal(nil, "Generator function passed to GenerateCommandList had an invalid function header.") } return commands } func ConstructSSHCommand(useLocal bool, host string, cmd string) []string { if useLocal { return []string{"bash", "-c", cmd} } currentUser, _ := operating.System.CurrentUser() user := currentUser.Username return []string{"ssh", "-o", "StrictHostKeyChecking=no", fmt.Sprintf("%s@%s", user, host), cmd} } /* * This function essentially wraps GenerateCommandList such that commands to be * executed on other hosts are sent through SSH and local commands use Bash. */ func (cluster *Cluster) GenerateSSHCommandList(scope Scope, generator interface{}) []ShellCommand { var commands []ShellCommand localHost := cluster.GetHostForContent(-1) switch generateCommand := generator.(type) { case func(content int) string: commands = cluster.GenerateCommandList(scope, func(content int) []string { useLocal := (cluster.GetHostForContent(content) == localHost || scopeIsLocal(scope)) cmd := generateCommand(content) return ConstructSSHCommand(useLocal, cluster.GetHostForContent(content), cmd) }) case func(host string) string: commands = cluster.GenerateCommandList(scope, func(host string) []string { useLocal := (host == localHost || scopeIsLocal(scope)) cmd := generateCommand(host) return ConstructSSHCommand(useLocal, host, cmd) }) } return commands } func (executor *GPDBExecutor) ExecuteLocalCommand(commandStr string) (string, error) { output, err := exec.Command("bash", "-c", commandStr).CombinedOutput() return string(output), err } /* * This function just executes all of the commands passed to it in parallel; it * doesn't care about the scope of the command except to pass that on to the * RemoteOutput after execution. * TODO: Add batching to prevent bottlenecks when executing in a huge cluster. */ func (executor *GPDBExecutor) ExecuteClusterCommand(scope Scope, commandList []ShellCommand) *RemoteOutput { length := len(commandList) finished := make(chan int) numErrors := 0 for i := range commandList { go func(index int) { command := commandList[index] var stderr bytes.Buffer cmd := command.Command cmd.Stderr = &stderr out, err := cmd.Output() command.Stdout = string(out) command.Stderr = stderr.String() command.Error = err command.Completed = true commandList[index] = command finished <- index }(i) } for i := 0; i < length; i++ { index := <-finished if commandList[index].Error != nil { numErrors++ } } return NewRemoteOutput(scope, numErrors, commandList) } /* * GenerateAndExecuteCommand and CheckClusterError are generic wrapper functions * to simplify execution of... * 1. shell commands directly on remote hosts via ssh. * - e.g. running an ls on all hosts * 2. shell commands on coordinator to push to remote hosts. * - e.g. running multiple scps on coordinator to push a file to all segments */ func (cluster *Cluster) GenerateAndExecuteCommand(verboseMsg string, scope Scope, generator interface{}) *RemoteOutput { gplog.Verbose(verboseMsg) commandList := cluster.GenerateSSHCommandList(scope, generator) return cluster.ExecuteClusterCommand(scope, commandList) } func (cluster *Cluster) CheckClusterError(remoteOutput *RemoteOutput, finalErrMsg string, messageFunc interface{}, noFatal ...bool) { if remoteOutput.NumErrors == 0 { return } for _, failedCommand := range remoteOutput.FailedCommands { errStr := fmt.Sprintf("with error %s: %s", failedCommand.Error, failedCommand.Stderr) switch getMessage := messageFunc.(type) { case func(content int) string: content := failedCommand.Content host := cluster.GetHostForContent(content) gplog.Verbose("%s on segment %d on host %s %s", getMessage(content), content, host, errStr) case func(host string) string: host := failedCommand.Host gplog.Verbose("%s on host %s %s", getMessage(host), host, errStr) } gplog.Verbose("Command was: %s", failedCommand.CommandString) } if len(noFatal) == 1 && noFatal[0] == true { gplog.Error(finalErrMsg) } else { LogFatalClusterError(finalErrMsg, remoteOutput.Scope, remoteOutput.NumErrors) } } func LogFatalClusterError(errMessage string, scope Scope, numErrors int) { str := " on" if scopeIsLocal(scope) { str += " coordinator for" // No good way to toggle "coordinator" vs. "master" here based on version, so default to "coordinator" } errMessage += str segMsg := "segment" if scopeIsHosts(scope) { segMsg = "host" } if numErrors != 1 { segMsg += "s" } gplog.Fatal(errors.Errorf("%s %d %s. See %s for a complete list of errors.", errMessage, numErrors, segMsg, gplog.GetLogFilePath()), "") } /* * Due to how NewCluster sets up ByContent, each content key points to a pair * of segments with the primary first and mirror second. As most users of * Cluster are only going to care about primaries, by default each of the * Get[Foo]ForContent functions below returns the primary value by default, * and an optional parameter can be passed to specify which value is desired. */ func getSegmentByRole(segmentList []*SegConfig, role ...string) *SegConfig { if len(role) == 1 && role[0] == "m" { if len(segmentList) < 2 { return nil } return segmentList[1] } return segmentList[0] } func (cluster *Cluster) GetDbidForContent(contentID int, role ...string) int { segConfig := getSegmentByRole(cluster.ByContent[contentID], role...) if segConfig == nil { return -1 } return segConfig.DbID } func (cluster *Cluster) GetPortForContent(contentID int, role ...string) int { segConfig := getSegmentByRole(cluster.ByContent[contentID], role...) if segConfig == nil { return -1 } return segConfig.Port } func (cluster *Cluster) GetHostForContent(contentID int, role ...string) string { segConfig := getSegmentByRole(cluster.ByContent[contentID], role...) if segConfig == nil { return "" } return segConfig.Hostname } func (cluster *Cluster) GetDirForContent(contentID int, role ...string) string { segConfig := getSegmentByRole(cluster.ByContent[contentID], role...) if segConfig == nil { return "" } return segConfig.DataDir } func (cluster *Cluster) GetDbidsForHost(hostname string) []int { dbids := make([]int, len(cluster.ByHost[hostname])) for i, seg := range cluster.ByHost[hostname] { dbids[i] = seg.DbID } return dbids } func (cluster *Cluster) GetContentsForHost(hostname string) []int { contents := make([]int, len(cluster.ByHost[hostname])) for i, seg := range cluster.ByHost[hostname] { contents[i] = seg.ContentID } return contents } func (cluster *Cluster) GetPortsForHost(hostname string) []int { ports := make([]int, len(cluster.ByHost[hostname])) for i, seg := range cluster.ByHost[hostname] { ports[i] = seg.Port } return ports } func (cluster *Cluster) GetDirsForHost(hostname string) []string { dirs := make([]string, len(cluster.ByHost[hostname])) for i, seg := range cluster.ByHost[hostname] { dirs[i] = seg.DataDir } return dirs } /* * Helper functions */ func GetSegmentConfiguration(connection *dbconn.DBConn, getMirrors ...bool) ([]SegConfig, error) { includeMirrors := len(getMirrors) == 1 && getMirrors[0] query := "" whereClause := "WHERE role = 'p'" if includeMirrors { whereClause = "" } query = fmt.Sprintf(` SELECT dbid, content as contentid, role, port, hostname, datadir FROM gp_segment_configuration %s ORDER BY content, role DESC;`, whereClause) results := make([]SegConfig, 0) err := connection.Select(&results, query) if err != nil { return nil, err } return results, nil } func MustGetSegmentConfiguration(connection *dbconn.DBConn, getMirrors ...bool) []SegConfig { segConfigs, err := GetSegmentConfiguration(connection, len(getMirrors) == 1 && getMirrors[0]) gplog.FatalOnError(err) return segConfigs }