def run()

in automation/tinc/main/ext/qautils/gppylib/programs/clsRecoverSegment.py [0:0]


    def run(self):
        if self.__options.parallelDegree < 1 or self.__options.parallelDegree > 64:
            raise ProgramArgumentValidationException("Invalid parallelDegree provided with -B argument: %d" % self.__options.parallelDegree)

        self.__pool = base.WorkerPool(self.__options.parallelDegree)
        gpEnv = GpMasterEnvironment(self.__options.masterDataDirectory, True)

        # verify "where to recover" options
        optionCnt = 0
        if self.__options.newRecoverHosts is not None:
            optionCnt += 1
        if self.__options.spareDataDirectoryFile is not None:
            optionCnt += 1
        if self.__options.recoveryConfigFile is not None:
            optionCnt += 1
        if self.__options.outputSpareDataDirectoryFile is not None:
            optionCnt += 1
        if self.__options.rebalanceSegments:
            optionCnt += 1
        if optionCnt > 1:
            raise ProgramArgumentValidationException(\
                    "Only one of -i, -p, -s, -r, and -S may be specified")

        faultProberInterface.getFaultProber().initializeProber(gpEnv.getMasterPort())

        confProvider = configInterface.getConfigurationProvider().initializeProvider(gpEnv.getMasterPort())
        gpArray = confProvider.loadSystemConfig(useUtilityMode=False) 

        # Make sure gpArray and segments are in agreement on current state of system.
        segmentList = gpArray.getSegDbList()
        getVersionCmds = {}
        for seg in segmentList:
            if seg.isSegmentQD() == True:
                continue
            if seg.isSegmentModeInChangeLogging() == False:
                continue
            cmd = gp.SendFilerepTransitionStatusMessage( name = "Get segment status information"
                                                         , msg = gp.SEGMENT_STATUS_GET_STATUS
                                                         , dataDir = seg.getSegmentDataDirectory()
                                                         , port = seg.getSegmentPort()
                                                         , ctxt = gp.REMOTE
                                                         , remoteHost = seg.getSegmentHostName()
                                                        )
            getVersionCmds[seg.getSegmentDbId()] = cmd
            self.__pool.addCommand(cmd)
        self.__pool.join()

        # We can not check to see if the command was successful or not, because gp_primarymirror always returns a non-zero result.
        # That is just the way gp_primarymirror was designed.

        dbsMap = gpArray.getSegDbMap()
        for dbid in getVersionCmds:
            cmd = getVersionCmds[dbid]
            mode         = None
            segmentState = None
            dataState    = None
            try:
                lines = str(cmd.get_results().stderr).split("\n")
                mode         = lines[0].split(": ")[1].strip()
                segmentState = lines[1].split(": ")[1].strip()
                dataState    = lines[2].split(": ")[1].strip()
            except Exception, e:
                self.logger.warning("Problem getting Segment state dbid = %s, results = %s." % (str(dbid), str(cmd.get_results().stderr)))
                continue

            db = dbsMap[dbid]
            if gparray.ROLE_TO_MODE_MAP[db.getSegmentRole()] != mode:
                raise Exception("Inconsistency in catalog and segment Role/Mode. Catalog Role = %s. Segment Mode = %s." % (db.getSegmentRole(), mode))
            if gparray.MODE_TO_DATA_STATE_MAP[db.getSegmentMode()] != dataState:
                raise Exception("Inconsistency in catalog and segment Mode/DataState. Catalog Mode = %s. Segment DataState = %s." % (db.getSegmentMode(), dataState))
            if segmentState != gparray.SEGMENT_STATE_READY and segmentState != gparray.SEGMENT_STATE_CHANGE_TRACKING_DISABLED:
                if segmentState == gparray.SEGMENT_STATE_INITIALIZATION or segmentState == gparray.SEGMENT_STATE_IN_CHANGE_TRACKING_TRANSITION:
                    raise Exception("Segment is not ready for recovery dbid = %s, segmentState = %s. Retry recovery in a few moments" % (str(db.getSegmentDbId()), segmentState))
                else:
                    raise Exception("Segment is in unexpected state. dbid = %s, segmentState = %s." % (str(db.getSegmentDbId()), segmentState))

        # check that we actually have mirrors
        if gpArray.getFaultStrategy() == gparray.FAULT_STRATEGY_SAN:
            self.SanFailback(gpArray, gpEnv)
            return 0
        elif gpArray.getFaultStrategy() != gparray.FAULT_STRATEGY_FILE_REPLICATION:
            raise ExceptionNoStackTraceNeeded( \
                    'GPDB Mirroring replication is not configured for this Greenplum Database instance.')

        # We have phys-rep/filerep mirrors.

        if self.__options.outputSpareDataDirectoryFile is not None:
            self.__outputSpareDataDirectoryFile(gpEnv, gpArray, self.__options.outputSpareDataDirectoryFile)
            return 0

        if self.__options.newRecoverHosts is not None:
            try:
                uniqueHosts = []
                [uniqueHosts.append(h.strip()) for h in self.__options.newRecoverHosts.split(',') \
                    if h.strip() not in uniqueHosts ]
                self.__options.newRecoverHosts = uniqueHosts
            except Exception, ex:
                raise ProgramArgumentValidationException(\
                    "Invalid value for recover hosts: %s" % ex)


        # If it's a rebalance operation, make sure we are in an acceptable state to do that
        # Acceptable state is:
        #    - No segments down
        #    - No segments in change tracking or unsynchronized state
        if self.__options.rebalanceSegments:
            if len(gpArray.get_invalid_segdbs()) > 0:
                raise Exception("Down segments still exist.  All segments must be up to rebalance.")
            if len(gpArray.get_synchronized_segdbs()) != len(gpArray.getSegDbList()):
                raise Exception("Some segments are not yet synchronized.  All segments must be synchronized to rebalance.")

        # retain list of hosts that were existing in the system prior to getRecoverActions...
        # this will be needed for later calculations that determine whether
        # new hosts were added into the system
        existing_hosts = set(gpArray.getHostList())

        # figure out what needs to be done
        mirrorBuilder = self.getRecoveryActionsBasedOnOptions(gpEnv, gpArray)

        if self.__options.outputSampleConfigFile is not None:
            # just output config file and done
            self.outputToFile(mirrorBuilder, gpArray, self.__options.outputSampleConfigFile)
            self.logger.info('Configuration file output to %s successfully.' % self.__options.outputSampleConfigFile)
        elif self.__options.rebalanceSegments:
            assert(isinstance(mirrorBuilder,GpSegmentRebalanceOperation))

            # Make sure we have work to do
            if len(gpArray.get_unbalanced_segdbs()) == 0:
                self.logger.info("No segments are running in their non-preferred role and need to be rebalanced.")
            else:           
                self.displayRecovery(mirrorBuilder, gpArray)
                
                if self.__options.interactive:
                    self.logger.warn("This operation will cancel queries that are currently executing.")
                    self.logger.warn("Connections to the database however will not be interrupted.")
                    if not userinput.ask_yesno(None, "\nContinue with segment rebalance procedure", 'N'):
                        raise UserAbortedException()
                
                mirrorBuilder.rebalance()
                
                self.logger.info("******************************************************************")
                self.logger.info("The rebalance operation has completed successfully.")
                self.logger.info("There is a resynchronization running in the background to bring all")
                self.logger.info("segments in sync.")
                self.logger.info("")
                self.logger.info("Use gpstate -s to check the resynchronization progress.")
                self.logger.info("******************************************************************")
            
        elif len(mirrorBuilder.getMirrorsToBuild()) == 0:
            self.logger.info('No segments to recover')
        else:
            mirrorBuilder.checkForPortAndDirectoryConflicts(gpArray)

            self.displayRecovery(mirrorBuilder, gpArray)
            self.__displayRecoveryWarnings(mirrorBuilder)

            if self.__options.interactive:
                if not userinput.ask_yesno(None, "\nContinue with segment recovery procedure", 'N'):
                    raise UserAbortedException()

            # sync packages
            current_hosts = set(gpArray.getHostList())
            new_hosts = current_hosts - existing_hosts
            if new_hosts:
                self.syncPackages(new_hosts)

            mirrorBuilder.buildMirrors("recover", gpEnv, gpArray )
            
            confProvider.sendPgElogFromMaster("Recovery of %d segment(s) has been started." % \
                    len(mirrorBuilder.getMirrorsToBuild()), True)

            self.logger.info("******************************************************************")
            self.logger.info("Updating segments for resynchronization is completed.")
            self.logger.info("For segments updated successfully, resynchronization will continue in the background.")
            self.logger.info("")
            self.logger.info("Use  gpstate -s  to check the resynchronization progress.")
            self.logger.info("******************************************************************")

        return 0 # success -- exit code 0!