def inner_run()

in cqlsh-expansion/pylib/cqlshlib/copyutil.py [0:0]


    def inner_run(self):
        """
        Send one batch per worker process to the queue unless we have exceeded the ingest rate.
        In the export case we queue everything and let the worker processes throttle using max_requests,
        here we throttle using the ingest rate in the feeding process because of memory usage concerns.
        When finished we send back to the parent process the total number of rows sent.
        """

        self.on_fork()

        reader = self.reader
        try:
            reader.start()
        except IOError, exc:
            self.outmsg.send(ImportTaskError(exc.__class__.__name__, exc.message))

        channels = self.worker_channels
        max_pending_chunks = self.max_pending_chunks
        sent = 0
        failed_attempts = 0

        while not reader.exhausted:
            channels_eligible = filter(lambda c: c.num_pending() < max_pending_chunks, channels)
            if not channels_eligible:
                failed_attempts += 1
                delay = randint(1, pow(2, failed_attempts))
                printdebugmsg("All workers busy, sleeping for %d second(s)" % (delay,))
                time.sleep(delay)
                continue
            elif failed_attempts > 0:
                failed_attempts = 0

            for ch in channels_eligible:
                try:
                    max_rows = self.ingest_rate - self.send_meter.current_record
                    if max_rows <= 0:
                        self.send_meter.maybe_update(sleep=False)
                        continue

                    rows = reader.read_rows(max_rows)
                    if rows:
                        sent += self.send_chunk(ch, rows)
                except Exception, exc:
                    self.outmsg.send(ImportTaskError(exc.__class__.__name__, exc.message))

                if reader.exhausted:
                    break

        # send back to the parent process the number of rows sent to the worker processes
        self.outmsg.send(FeedingProcessResult(sent, reader))

        # wait for poison pill (None)
        self.inmsg.recv()