in cqlsh-expansion/pylib/cqlshlib/copyutil.py [0:0]
def import_records(self):
"""
Keep on running until we have stuff to receive or send and until all processes are running.
Send data (batches or retries) up to the max ingest rate. If we are waiting for stuff to
receive check the incoming queue.
"""
if not self.fname:
self.send_stdin_rows()
child_timeout = self.options.copy['childtimeout']
last_recv_num_records = 0
last_recv_time = time.time()
while self.feeding_result is None or self.receive_meter.total_records < self.feeding_result.sent:
self.receive_results()
if self.feeding_result is not None:
if self.receive_meter.total_records != last_recv_num_records:
last_recv_num_records = self.receive_meter.total_records
last_recv_time = time.time()
elif (time.time() - last_recv_time) > child_timeout:
self.shell.printerr("No records inserted in {} seconds, aborting".format(child_timeout))
break
if self.error_handler.max_exceeded() or not self.all_processes_running():
break
if self.error_handler.num_rows_failed:
self.shell.printerr("Failed to process %d rows; failed rows written to %s" %
(self.error_handler.num_rows_failed,
self.error_handler.err_file))
if not self.all_processes_running():
self.shell.printerr("{} child process(es) died unexpectedly, aborting"
.format(self.num_processes - self.num_live_processes()))
else:
if self.error_handler.max_exceeded():
self.processes[-1].terminate() # kill the feeder
for i, _ in enumerate(self.processes):
if self.processes[i].is_alive():
self.outmsg.channels[i].send(None)
# allow time for worker processes to exit cleanly
attempts = 50 # 100 milliseconds per attempt, so 5 seconds total
while attempts > 0 and self.num_live_processes() > 0:
time.sleep(0.1)
attempts -= 1
self.printmsg("\n%d rows imported from %d files in %s (%d skipped)." %
(self.receive_meter.get_total_records(),
self.feeding_result.num_sources if self.feeding_result else 0,
self.describe_interval(time.time() - self.time_start),
self.feeding_result.skip_rows if self.feeding_result else 0))