in cqlsh-expansion/pylib/cqlshlib/copyutil.py [0:0]
def inner_run(self):
"""
Send one batch per worker process to the queue unless we have exceeded the ingest rate.
In the export case we queue everything and let the worker processes throttle using max_requests,
here we throttle using the ingest rate in the feeding process because of memory usage concerns.
When finished we send back to the parent process the total number of rows sent.
"""
self.on_fork()
reader = self.reader
try:
reader.start()
except IOError, exc:
self.outmsg.send(ImportTaskError(exc.__class__.__name__, exc.message))
channels = self.worker_channels
max_pending_chunks = self.max_pending_chunks
sent = 0
failed_attempts = 0
while not reader.exhausted:
channels_eligible = filter(lambda c: c.num_pending() < max_pending_chunks, channels)
if not channels_eligible:
failed_attempts += 1
delay = randint(1, pow(2, failed_attempts))
printdebugmsg("All workers busy, sleeping for %d second(s)" % (delay,))
time.sleep(delay)
continue
elif failed_attempts > 0:
failed_attempts = 0
for ch in channels_eligible:
try:
max_rows = self.ingest_rate - self.send_meter.current_record
if max_rows <= 0:
self.send_meter.maybe_update(sleep=False)
continue
rows = reader.read_rows(max_rows)
if rows:
sent += self.send_chunk(ch, rows)
except Exception, exc:
self.outmsg.send(ImportTaskError(exc.__class__.__name__, exc.message))
if reader.exhausted:
break
# send back to the parent process the number of rows sent to the worker processes
self.outmsg.send(FeedingProcessResult(sent, reader))
# wait for poison pill (None)
self.inmsg.recv()