def export_records()

in cqlsh-expansion/pylib/cqlshlib/copyutil.py [0:0]


    def export_records(self, ranges):
        """
        Send records to child processes and monitor them by collecting their results
        or any errors. We terminate when we have processed all the ranges or when one child
        process has died (since in this case we will never get any ACK for the ranges
        processed by it and at the moment we don't keep track of which ranges a
        process is handling).
        """
        shell = self.shell
        processes = self.processes
        meter = RateMeter(log_fcn=self.printmsg,
                          update_interval=self.options.copy['reportfrequency'],
                          log_file=self.options.copy['ratefile'])
        total_requests = len(ranges)
        max_attempts = self.options.copy['maxattempts']

        self.send_work(ranges, ranges.keys())

        num_processes = len(processes)
        succeeded = 0
        failed = 0
        while (failed + succeeded) < total_requests and self.num_live_processes() == num_processes:
            for token_range, result in self.inmsg.recv(timeout=0.1):
                if token_range is None and result is None:  # a request has finished
                    succeeded += 1
                elif isinstance(result, Exception):  # an error occurred
                    # This token_range failed, retry up to max_attempts if no rows received yet,
                    # If rows were already received we'd risk duplicating data.
                    # Note that there is still a slight risk of duplicating data, even if we have
                    # an error with no rows received yet, it's just less likely. To avoid retrying on
                    # all timeouts would however mean we could risk not exporting some rows.
                    if ranges[token_range]['attempts'] < max_attempts and ranges[token_range]['rows'] == 0:
                        shell.printerr('Error for %s: %s (will try again later attempt %d of %d)'
                                       % (token_range, result, ranges[token_range]['attempts'], max_attempts))
                        self.send_work(ranges, [token_range])
                    else:
                        shell.printerr('Error for %s: %s (permanently given up after %d rows and %d attempts)'
                                       % (token_range, result, ranges[token_range]['rows'],
                                          ranges[token_range]['attempts']))
                        failed += 1
                else:  # partial result received
                    data, num = result
                    self.writer.write(data, num)
                    meter.increment(n=num)
                    ranges[token_range]['rows'] += num

        if self.num_live_processes() < len(processes):
            for process in processes:
                if not process.is_alive():
                    shell.printerr('Child process %d died with exit code %d' % (process.pid, process.exitcode))

        if succeeded < total_requests:
            shell.printerr('Exported %d ranges out of %d total ranges, some records might be missing'
                           % (succeeded, total_requests))

        self.printmsg("\n%d rows exported to %d files in %s." %
                      (meter.get_total_records(),
                       self.writer.num_files,
                       self.describe_interval(time.time() - self.time_start)))