in cqlsh-expansion/pylib/cqlshlib/copyutil.py [0:0]
def export_records(self, ranges):
"""
Send records to child processes and monitor them by collecting their results
or any errors. We terminate when we have processed all the ranges or when one child
process has died (since in this case we will never get any ACK for the ranges
processed by it and at the moment we don't keep track of which ranges a
process is handling).
"""
shell = self.shell
processes = self.processes
meter = RateMeter(log_fcn=self.printmsg,
update_interval=self.options.copy['reportfrequency'],
log_file=self.options.copy['ratefile'])
total_requests = len(ranges)
max_attempts = self.options.copy['maxattempts']
self.send_work(ranges, ranges.keys())
num_processes = len(processes)
succeeded = 0
failed = 0
while (failed + succeeded) < total_requests and self.num_live_processes() == num_processes:
for token_range, result in self.inmsg.recv(timeout=0.1):
if token_range is None and result is None: # a request has finished
succeeded += 1
elif isinstance(result, Exception): # an error occurred
# This token_range failed, retry up to max_attempts if no rows received yet,
# If rows were already received we'd risk duplicating data.
# Note that there is still a slight risk of duplicating data, even if we have
# an error with no rows received yet, it's just less likely. To avoid retrying on
# all timeouts would however mean we could risk not exporting some rows.
if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0:
shell.printerr('Error for %s: %s (will try again later attempt %d of %d)'
% (token_range, result, ranges[token_range]['attempts'], max_attempts))
self.send_work(ranges, [token_range])
else:
shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)'
% (token_range, result, ranges[token_range]['rows'],
ranges[token_range]['attempts']))
failed += 1
else: # partial result received
data, num = result
self.writer.write(data, num)
meter.increment(n=num)
ranges[token_range]['rows'] += num
if self.num_live_processes() < len(processes):
for process in processes:
if not process.is_alive():
shell.printerr('Child process %d died with exit code %d' % (process.pid, process.exitcode))
if succeeded < total_requests:
shell.printerr('Exported %d ranges out of %d total ranges, some records might be missing'
% (succeeded, total_requests))
self.printmsg("\n%d rows exported to %d files in %s." %
(meter.get_total_records(),
self.writer.num_files,
self.describe_interval(time.time() - self.time_start)))