in VMBackup/main/guestsnapshotter.py [0:0]
def snapshotall_parallel(self, paras, freezer, thaw_done, g_fsfreeze_on):
self.logger.log("doing snapshotall now in parallel...")
snapshot_result = SnapshotResult()
blob_snapshot_info_array = []
all_failed = True
exceptOccurred = False
is_inconsistent = False
thaw_done_local = thaw_done
unable_to_sleep = False
all_snapshots_failed = False
set_next_backup_to_seq = False
try:
self.logger.log("before start of multiprocessing queues..")
mp_jobs = []
queue_creation_starttime = datetime.datetime.now()
global_logger = mp.Queue()
global_error_logger = mp.Queue()
snapshot_result_error = mp.Queue()
snapshot_info_indexer_queue = mp.Queue()
time_before_snapshot_start = datetime.datetime.utcnow()
blobs = paras.blobs
if blobs is not None:
# initialize blob_snapshot_info_array
mp_jobs = []
blob_index = 0
self.logger.log('****** 5. Snaphotting (Guest-parallel) Started')
for blob in blobs:
blobUri = blob.split("?")[0]
self.logger.log("index: " + str(blob_index) + " blobUri: " + str(blobUri))
blob_snapshot_info_array.append(HostSnapshotObjects.BlobSnapshotInfo(False, blobUri, None, 500))
try:
mp_jobs.append(mp.Process(target=self.snapshot,args=(blob, blob_index, paras.wellKnownSettingFlags, paras.backup_metadata, snapshot_result_error, snapshot_info_indexer_queue, global_logger, global_error_logger)))
except Exception as e:
self.logger.log("multiprocess queue creation failed")
all_snapshots_failed = True
raise Exception("Exception while creating multiprocess queue")
blob_index = blob_index + 1
counter = 0
for job in mp_jobs:
job.start()
if(counter == 0):
queue_creation_endtime = datetime.datetime.now()
timediff = queue_creation_endtime - queue_creation_starttime
if(timediff.seconds >= 10):
self.logger.log("mp queue creation took more than 10 secs. Setting next backup to sequential")
set_next_backup_to_seq = True
counter = counter + 1
for job in mp_jobs:
job.join()
self.logger.log('****** 6. Snaphotting (Guest-parallel) Completed')
thaw_result = None
if g_fsfreeze_on and thaw_done_local == False:
time_before_thaw = datetime.datetime.now()
thaw_result, unable_to_sleep = freezer.thaw_safe()
time_after_thaw = datetime.datetime.now()
HandlerUtil.HandlerUtility.add_to_telemetery_data("ThawTime", str(time_after_thaw-time_before_thaw))
thaw_done_local = True
if(set_next_backup_to_seq == True):
self.logger.log("Setting to sequential snapshot")
self.hutil.set_value_to_configfile('seqsnapshot', '1')
self.logger.log('T:S thaw result ' + str(thaw_result))
if(thaw_result is not None and len(thaw_result.errors) > 0 and (snapshot_result is None or len(snapshot_result.errors) == 0)):
is_inconsistent = True
snapshot_result.errors.append(thaw_result.errors)
return snapshot_result, blob_snapshot_info_array, all_failed, exceptOccurred, is_inconsistent, thaw_done_local, unable_to_sleep, all_snapshots_failed
self.logger.log('end of snapshot process')
logging = [global_logger.get() for job in mp_jobs]
self.logger.log(str(logging))
error_logging = [global_error_logger.get() for job in mp_jobs]
self.logger.log(str(error_logging),False,'Error')
if not snapshot_result_error.empty():
results = [snapshot_result_error.get() for job in mp_jobs]
for result in results:
if(result.errorcode != CommonVariables.success):
snapshot_result.errors.append(result)
if not snapshot_info_indexer_queue.empty():
snapshot_info_indexers = [snapshot_info_indexer_queue.get() for job in mp_jobs]
for snapshot_info_indexer in snapshot_info_indexers:
# update blob_snapshot_info_array element properties from snapshot_info_indexer object
self.get_snapshot_info(snapshot_info_indexer, blob_snapshot_info_array[snapshot_info_indexer.index])
if (blob_snapshot_info_array[snapshot_info_indexer.index].isSuccessful == True):
all_failed = False
self.logger.log("index: " + str(snapshot_info_indexer.index) + " blobSnapshotUri: " + str(blob_snapshot_info_array[snapshot_info_indexer.index].snapshotUri))
all_snapshots_failed = all_failed
self.logger.log("Setting all_snapshots_failed to " + str(all_snapshots_failed))
return snapshot_result, blob_snapshot_info_array, all_failed, exceptOccurred, is_inconsistent, thaw_done_local, unable_to_sleep, all_snapshots_failed
else:
self.logger.log("the blobs are None")
return snapshot_result, blob_snapshot_info_array, all_failed, exceptOccurred, is_inconsistent, thaw_done_local, unable_to_sleep
except Exception as e:
errorMsg = " Unable to perform parallel snapshot with error: %s, stack trace: %s" % (str(e), traceback.format_exc())
self.logger.log(errorMsg)
exceptOccurred = True
return snapshot_result, blob_snapshot_info_array, all_failed, exceptOccurred, is_inconsistent, thaw_done_local, unable_to_sleep, all_snapshots_failed