in automation/tinc/main/ext/qautils/gppylib/programs/clsRecoverSegment.py [0:0]
def run(self):
if self.__options.parallelDegree < 1 or self.__options.parallelDegree > 64:
raise ProgramArgumentValidationException("Invalid parallelDegree provided with -B argument: %d" % self.__options.parallelDegree)
self.__pool = base.WorkerPool(self.__options.parallelDegree)
gpEnv = GpMasterEnvironment(self.__options.masterDataDirectory, True)
# verify "where to recover" options
optionCnt = 0
if self.__options.newRecoverHosts is not None:
optionCnt += 1
if self.__options.spareDataDirectoryFile is not None:
optionCnt += 1
if self.__options.recoveryConfigFile is not None:
optionCnt += 1
if self.__options.outputSpareDataDirectoryFile is not None:
optionCnt += 1
if self.__options.rebalanceSegments:
optionCnt += 1
if optionCnt > 1:
raise ProgramArgumentValidationException(\
"Only one of -i, -p, -s, -r, and -S may be specified")
faultProberInterface.getFaultProber().initializeProber(gpEnv.getMasterPort())
confProvider = configInterface.getConfigurationProvider().initializeProvider(gpEnv.getMasterPort())
gpArray = confProvider.loadSystemConfig(useUtilityMode=False)
# Make sure gpArray and segments are in agreement on current state of system.
segmentList = gpArray.getSegDbList()
getVersionCmds = {}
for seg in segmentList:
if seg.isSegmentQD() == True:
continue
if seg.isSegmentModeInChangeLogging() == False:
continue
cmd = gp.SendFilerepTransitionStatusMessage( name = "Get segment status information"
, msg = gp.SEGMENT_STATUS_GET_STATUS
, dataDir = seg.getSegmentDataDirectory()
, port = seg.getSegmentPort()
, ctxt = gp.REMOTE
, remoteHost = seg.getSegmentHostName()
)
getVersionCmds[seg.getSegmentDbId()] = cmd
self.__pool.addCommand(cmd)
self.__pool.join()
# We can not check to see if the command was successful or not, because gp_primarymirror always returns a non-zero result.
# That is just the way gp_primarymirror was designed.
dbsMap = gpArray.getSegDbMap()
for dbid in getVersionCmds:
cmd = getVersionCmds[dbid]
mode = None
segmentState = None
dataState = None
try:
lines = str(cmd.get_results().stderr).split("\n")
mode = lines[0].split(": ")[1].strip()
segmentState = lines[1].split(": ")[1].strip()
dataState = lines[2].split(": ")[1].strip()
except Exception, e:
self.logger.warning("Problem getting Segment state dbid = %s, results = %s." % (str(dbid), str(cmd.get_results().stderr)))
continue
db = dbsMap[dbid]
if gparray.ROLE_TO_MODE_MAP[db.getSegmentRole()] != mode:
raise Exception("Inconsistency in catalog and segment Role/Mode. Catalog Role = %s. Segment Mode = %s." % (db.getSegmentRole(), mode))
if gparray.MODE_TO_DATA_STATE_MAP[db.getSegmentMode()] != dataState:
raise Exception("Inconsistency in catalog and segment Mode/DataState. Catalog Mode = %s. Segment DataState = %s." % (db.getSegmentMode(), dataState))
if segmentState != gparray.SEGMENT_STATE_READY and segmentState != gparray.SEGMENT_STATE_CHANGE_TRACKING_DISABLED:
if segmentState == gparray.SEGMENT_STATE_INITIALIZATION or segmentState == gparray.SEGMENT_STATE_IN_CHANGE_TRACKING_TRANSITION:
raise Exception("Segment is not ready for recovery dbid = %s, segmentState = %s. Retry recovery in a few moments" % (str(db.getSegmentDbId()), segmentState))
else:
raise Exception("Segment is in unexpected state. dbid = %s, segmentState = %s." % (str(db.getSegmentDbId()), segmentState))
# check that we actually have mirrors
if gpArray.getFaultStrategy() == gparray.FAULT_STRATEGY_SAN:
self.SanFailback(gpArray, gpEnv)
return 0
elif gpArray.getFaultStrategy() != gparray.FAULT_STRATEGY_FILE_REPLICATION:
raise ExceptionNoStackTraceNeeded( \
'GPDB Mirroring replication is not configured for this Greenplum Database instance.')
# We have phys-rep/filerep mirrors.
if self.__options.outputSpareDataDirectoryFile is not None:
self.__outputSpareDataDirectoryFile(gpEnv, gpArray, self.__options.outputSpareDataDirectoryFile)
return 0
if self.__options.newRecoverHosts is not None:
try:
uniqueHosts = []
[uniqueHosts.append(h.strip()) for h in self.__options.newRecoverHosts.split(',') \
if h.strip() not in uniqueHosts ]
self.__options.newRecoverHosts = uniqueHosts
except Exception, ex:
raise ProgramArgumentValidationException(\
"Invalid value for recover hosts: %s" % ex)
# If it's a rebalance operation, make sure we are in an acceptable state to do that
# Acceptable state is:
# - No segments down
# - No segments in change tracking or unsynchronized state
if self.__options.rebalanceSegments:
if len(gpArray.get_invalid_segdbs()) > 0:
raise Exception("Down segments still exist. All segments must be up to rebalance.")
if len(gpArray.get_synchronized_segdbs()) != len(gpArray.getSegDbList()):
raise Exception("Some segments are not yet synchronized. All segments must be synchronized to rebalance.")
# retain list of hosts that were existing in the system prior to getRecoverActions...
# this will be needed for later calculations that determine whether
# new hosts were added into the system
existing_hosts = set(gpArray.getHostList())
# figure out what needs to be done
mirrorBuilder = self.getRecoveryActionsBasedOnOptions(gpEnv, gpArray)
if self.__options.outputSampleConfigFile is not None:
# just output config file and done
self.outputToFile(mirrorBuilder, gpArray, self.__options.outputSampleConfigFile)
self.logger.info('Configuration file output to %s successfully.' % self.__options.outputSampleConfigFile)
elif self.__options.rebalanceSegments:
assert(isinstance(mirrorBuilder,GpSegmentRebalanceOperation))
# Make sure we have work to do
if len(gpArray.get_unbalanced_segdbs()) == 0:
self.logger.info("No segments are running in their non-preferred role and need to be rebalanced.")
else:
self.displayRecovery(mirrorBuilder, gpArray)
if self.__options.interactive:
self.logger.warn("This operation will cancel queries that are currently executing.")
self.logger.warn("Connections to the database however will not be interrupted.")
if not userinput.ask_yesno(None, "\nContinue with segment rebalance procedure", 'N'):
raise UserAbortedException()
mirrorBuilder.rebalance()
self.logger.info("******************************************************************")
self.logger.info("The rebalance operation has completed successfully.")
self.logger.info("There is a resynchronization running in the background to bring all")
self.logger.info("segments in sync.")
self.logger.info("")
self.logger.info("Use gpstate -s to check the resynchronization progress.")
self.logger.info("******************************************************************")
elif len(mirrorBuilder.getMirrorsToBuild()) == 0:
self.logger.info('No segments to recover')
else:
mirrorBuilder.checkForPortAndDirectoryConflicts(gpArray)
self.displayRecovery(mirrorBuilder, gpArray)
self.__displayRecoveryWarnings(mirrorBuilder)
if self.__options.interactive:
if not userinput.ask_yesno(None, "\nContinue with segment recovery procedure", 'N'):
raise UserAbortedException()
# sync packages
current_hosts = set(gpArray.getHostList())
new_hosts = current_hosts - existing_hosts
if new_hosts:
self.syncPackages(new_hosts)
mirrorBuilder.buildMirrors("recover", gpEnv, gpArray )
confProvider.sendPgElogFromMaster("Recovery of %d segment(s) has been started." % \
len(mirrorBuilder.getMirrorsToBuild()), True)
self.logger.info("******************************************************************")
self.logger.info("Updating segments for resynchronization is completed.")
self.logger.info("For segments updated successfully, resynchronization will continue in the background.")
self.logger.info("")
self.logger.info("Use gpstate -s to check the resynchronization progress.")
self.logger.info("******************************************************************")
return 0 # success -- exit code 0!