in dtest.py [0:0]
def create_cf(session, name, key_type="varchar", speculative_retry=None, read_repair=None, compression=None,
gc_grace=None, columns=None, validation="UTF8Type", compact_storage=False, compaction_strategy='SizeTieredCompactionStrategy',
primary_key=None, clustering=None, legacy_compression_class = False):
compaction_fragment = "compaction = {'class': '%s', 'enabled': 'true'}"
if compaction_strategy == '':
compaction_fragment = compaction_fragment % 'SizeTieredCompactionStrategy'
else:
compaction_fragment = compaction_fragment % compaction_strategy
additional_columns = ""
if columns is not None:
for k, v in list(columns.items()):
additional_columns = "{}, {} {}".format(additional_columns, k, v)
if additional_columns == "":
query = 'CREATE COLUMNFAMILY %s (key %s, c varchar, v varchar, PRIMARY KEY(key, c)) WITH comment=\'test cf\'' % (name, key_type)
else:
if primary_key:
query = 'CREATE COLUMNFAMILY %s (key %s%s, PRIMARY KEY(%s)) WITH comment=\'test cf\'' % (name, key_type, additional_columns, primary_key)
else:
query = 'CREATE COLUMNFAMILY %s (key %s PRIMARY KEY%s) WITH comment=\'test cf\'' % (name, key_type, additional_columns)
if compaction_fragment is not None:
query = '%s AND %s' % (query, compaction_fragment)
if clustering:
query = '%s AND CLUSTERING ORDER BY (%s)' % (query, clustering)
if compression is not None:
if legacy_compression_class:
query = '%s AND compression = { \'sstable_compression\': \'%sCompressor\' }' % (query, compression)
else:
query = '%s AND compression = { \'class\': \'%sCompressor\' }' % (query, compression)
else:
# if a compression option is omitted, C* will default to lz4 compression
query += ' AND compression = {}'
if read_repair is not None:
query = '%s AND read_repair_chance=%f AND dclocal_read_repair_chance=%f' % (query, read_repair, read_repair)
if gc_grace is not None:
query = '%s AND gc_grace_seconds=%d' % (query, gc_grace)
if speculative_retry is not None:
query = '%s AND speculative_retry=\'%s\'' % (query, speculative_retry)
if compact_storage:
query += ' AND COMPACT STORAGE'
try:
retry_till_success(session.execute, query=query, timeout=120, bypassed_exception=cassandra.OperationTimedOut)
except cassandra.AlreadyExists:
logger.warn('AlreadyExists executing create cf query \'%s\'' % query)
session.cluster.control_connection.wait_for_schema_agreement(wait_time=120)
#Going to ignore OperationTimedOut from create CF, so need to validate it was indeed created
session.execute('SELECT * FROM %s LIMIT 1' % name);