def split_into_batches()

in cqlsh-expansion/pylib/cqlshlib/copyutil.py [0:0]


    def split_into_batches(self, chunk, conv, tm):
        """
        Batch rows by ring position or replica.
        If there are at least min_batch_size rows for a ring position then split these rows into
        groups of max_batch_size and send a batch for each group, using all replicas for this ring position.
        Otherwise, we are forced to batch by replica, and here unfortunately we can only choose one replica to
        guarantee common replicas across partition keys. We are typically able
        to batch by ring position for small clusters or when VNODES are not used. For large clusters with VNODES
        it may not be possible, in this case it helps to increase the CHUNK SIZE but up to a limit, otherwise
        we may choke the cluster.
        """

        rows_by_ring_pos = defaultdict(list)
        errors = defaultdict(list)

        min_batch_size = self.min_batch_size
        max_batch_size = self.max_batch_size
        ring = tm.ring

        get_row_partition_key_values = conv.get_row_partition_key_values_fcn()
        pk_to_token_value = tm.pk_to_token_value
        get_ring_pos = tm.get_ring_pos
        make_batch = self.make_batch

        for row in chunk['rows']:
            try:
                pk = get_row_partition_key_values(row)
                rows_by_ring_pos[get_ring_pos(ring, pk_to_token_value(pk))].append(row)
            except Exception, e:
                errors[e.message].append(row)

        if errors:
            for msg, rows in errors.iteritems():
                self.report_error(ParseError(msg), chunk, rows)

        replicas = tm.replicas
        filter_replicas = tm.filter_replicas
        rows_by_replica = defaultdict(list)
        for ring_pos, rows in rows_by_ring_pos.iteritems():
            if len(rows) > min_batch_size:
                for i in xrange(0, len(rows), max_batch_size):
                    yield filter_replicas(replicas[ring_pos]), make_batch(chunk['id'], rows[i:i + max_batch_size])
            else:
                # select only the first valid replica to guarantee more overlap or none at all
                rows_by_replica[filter_replicas(replicas[ring_pos])[:1]].extend(rows)

        # Now send the batches by replica
        for replicas, rows in rows_by_replica.iteritems():
            for i in xrange(0, len(rows), max_batch_size):
                yield replicas, make_batch(chunk['id'], rows[i:i + max_batch_size])