def import_records()

in cqlsh-expansion/pylib/cqlshlib/copyutil.py [0:0]


    def import_records(self):
        """
        Keep on running until we have stuff to receive or send and until all processes are running.
        Send data (batches or retries) up to the max ingest rate. If we are waiting for stuff to
        receive check the incoming queue.
        """
        if not self.fname:
            self.send_stdin_rows()

        child_timeout = self.options.copy['childtimeout']
        last_recv_num_records = 0
        last_recv_time = time.time()

        while self.feeding_result is None or self.receive_meter.total_records < self.feeding_result.sent:
            self.receive_results()

            if self.feeding_result is not None:
                if self.receive_meter.total_records != last_recv_num_records:
                    last_recv_num_records = self.receive_meter.total_records
                    last_recv_time = time.time()
                elif (time.time() - last_recv_time) > child_timeout:
                    self.shell.printerr("No records inserted in {} seconds, aborting".format(child_timeout))
                    break

            if self.error_handler.max_exceeded() or not self.all_processes_running():
                break

        if self.error_handler.num_rows_failed:
            self.shell.printerr("Failed to process %d rows; failed rows written to %s" %
                                (self.error_handler.num_rows_failed,
                                 self.error_handler.err_file))

        if not self.all_processes_running():
            self.shell.printerr("{} child process(es) died unexpectedly, aborting"
                                .format(self.num_processes - self.num_live_processes()))
        else:
            if self.error_handler.max_exceeded():
                self.processes[-1].terminate()  # kill the feeder

            for i, _ in enumerate(self.processes):
                if self.processes[i].is_alive():
                    self.outmsg.channels[i].send(None)

        # allow time for worker processes to exit cleanly
        attempts = 50  # 100 milliseconds per attempt, so 5 seconds total
        while attempts > 0 and self.num_live_processes() > 0:
            time.sleep(0.1)
            attempts -= 1

        self.printmsg("\n%d rows imported from %d files in %s (%d skipped)." %
                      (self.receive_meter.get_total_records(),
                       self.feeding_result.num_sources if self.feeding_result else 0,
                       self.describe_interval(time.time() - self.time_start),
                       self.feeding_result.skip_rows if self.feeding_result else 0))