in cqlsh-expansion/pylib/cqlshlib/copyutil.py [0:0]
def split_into_batches(self, chunk, conv, tm):
"""
Batch rows by ring position or replica.
If there are at least min_batch_size rows for a ring position then split these rows into
groups of max_batch_size and send a batch for each group, using all replicas for this ring position.
Otherwise, we are forced to batch by replica, and here unfortunately we can only choose one replica to
guarantee common replicas across partition keys. We are typically able
to batch by ring position for small clusters or when VNODES are not used. For large clusters with VNODES
it may not be possible, in this case it helps to increase the CHUNK SIZE but up to a limit, otherwise
we may choke the cluster.
"""
rows_by_ring_pos = defaultdict(list)
errors = defaultdict(list)
min_batch_size = self.min_batch_size
max_batch_size = self.max_batch_size
ring = tm.ring
get_row_partition_key_values = conv.get_row_partition_key_values_fcn()
pk_to_token_value = tm.pk_to_token_value
get_ring_pos = tm.get_ring_pos
make_batch = self.make_batch
for row in chunk['rows']:
try:
pk = get_row_partition_key_values(row)
rows_by_ring_pos[get_ring_pos(ring, pk_to_token_value(pk))].append(row)
except Exception, e:
errors[e.message].append(row)
if errors:
for msg, rows in errors.iteritems():
self.report_error(ParseError(msg), chunk, rows)
replicas = tm.replicas
filter_replicas = tm.filter_replicas
rows_by_replica = defaultdict(list)
for ring_pos, rows in rows_by_ring_pos.iteritems():
if len(rows) > min_batch_size:
for i in xrange(0, len(rows), max_batch_size):
yield filter_replicas(replicas[ring_pos]), make_batch(chunk['id'], rows[i:i + max_batch_size])
else:
# select only the first valid replica to guarantee more overlap or none at all
rows_by_replica[filter_replicas(replicas[ring_pos])[:1]].extend(rows)
# Now send the batches by replica
for replicas, rows in rows_by_replica.iteritems():
for i in xrange(0, len(rows), max_batch_size):
yield replicas, make_batch(chunk['id'], rows[i:i + max_batch_size])