in tools/stress/src/org/apache/cassandra/stress/StressProfile.java [515:653]
public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
{
if (insertStatement == null)
{
synchronized (this)
{
if (insertStatement == null)
{
maybeLoadSchemaInfo(settings);
Set<com.datastax.driver.core.ColumnMetadata> keyColumns = com.google.common.collect.Sets.newHashSet(tableMetaData.getPrimaryKey());
Set<com.datastax.driver.core.ColumnMetadata> allColumns = com.google.common.collect.Sets.newHashSet(tableMetaData.getColumns());
boolean isKeyOnlyTable = (keyColumns.size() == allColumns.size());
//With compact storage
if (!isKeyOnlyTable && (keyColumns.size() == (allColumns.size() - 1)))
{
com.google.common.collect.Sets.SetView diff = com.google.common.collect.Sets.difference(allColumns, keyColumns);
for (Object obj : diff)
{
com.datastax.driver.core.ColumnMetadata col = (com.datastax.driver.core.ColumnMetadata)obj;
isKeyOnlyTable = col.getName().isEmpty();
break;
}
}
if (insert == null)
insert = new HashMap<>();
lowerCase(insert);
//Non PK Columns
StringBuilder sb = new StringBuilder();
if (!isKeyOnlyTable)
{
sb.append("UPDATE ").append(keyspaceName).append('.').append(quoteIdentifier(tableName)).append(" SET ");
//PK Columns
StringBuilder pred = new StringBuilder();
pred.append(" WHERE ");
boolean firstCol = true;
boolean firstPred = true;
for (com.datastax.driver.core.ColumnMetadata c : tableMetaData.getColumns()) {
if (keyColumns.contains(c)) {
if (firstPred)
firstPred = false;
else
pred.append(" AND ");
pred.append(quoteIdentifier(c.getName())).append(" = ?");
} else {
if (firstCol)
firstCol = false;
else
sb.append(',');
sb.append(quoteIdentifier(c.getName())).append(" = ");
switch (c.getType().getName())
{
case SET:
case LIST:
if (c.getType().isFrozen())
{
sb.append("?");
break;
}
case COUNTER:
sb.append(quoteIdentifier(c.getName())).append(" + ?");
break;
default:
sb.append("?");
break;
}
}
}
//Put PK predicates at the end
sb.append(pred);
if (insert.containsKey("condition"))
{
sb.append(" " + insert.get("condition"));
insert.remove("condition");
}
}
else
{
sb.append("INSERT INTO ").append(keyspaceName).append('.').append(quoteIdentifier(tableName)).append(" (");
StringBuilder value = new StringBuilder();
for (com.datastax.driver.core.ColumnMetadata c : tableMetaData.getPrimaryKey())
{
sb.append(quoteIdentifier(c.getName())).append(", ");
value.append("?, ");
}
sb.delete(sb.lastIndexOf(","), sb.length());
value.delete(value.lastIndexOf(","), value.length());
sb.append(") ").append("values(").append(value).append(')');
}
partitions = select(settings.insert.batchsize, "partitions", "fixed(1)", insert, OptionDistribution.BUILDER);
selectchance = select(settings.insert.selectRatio, "select", "fixed(1)/1", insert, OptionRatioDistribution.BUILDER);
rowPopulation = select(settings.insert.rowPopulationRatio, "row-population", "fixed(1)/1", insert, OptionRatioDistribution.BUILDER);
batchType = settings.insert.batchType != null
? settings.insert.batchType
: !insert.containsKey("batchtype")
? BatchStatement.Type.LOGGED
: BatchStatement.Type.valueOf(insert.remove("batchtype"));
if (!insert.isEmpty())
throw new IllegalArgumentException("Unrecognised insert option(s): " + insert);
Distribution visits = settings.insert.visits.get();
// these min/max are not absolutely accurate if selectchance < 1, but they're close enough to
// guarantee the vast majority of actions occur in these bounds
double minBatchSize = selectchance.get().min() * partitions.get().minValue() * generator.minRowCount * (1d / visits.maxValue());
double maxBatchSize = selectchance.get().max() * partitions.get().maxValue() * generator.maxRowCount * (1d / visits.minValue());
if (generator.maxRowCount > 100 * 1000 * 1000)
System.err.printf("WARNING: You have defined a schema that permits very large partitions (%.0f max rows (>100M))%n", generator.maxRowCount);
if (batchType == BatchStatement.Type.LOGGED && maxBatchSize > 65535)
{
System.err.printf("ERROR: You have defined a workload that generates batches with more than 65k rows (%.0f), but have required the use of LOGGED batches. There is a 65k row limit on a single batch.%n",
selectchance.get().max() * partitions.get().maxValue() * generator.maxRowCount);
System.exit(1);
}
if (maxBatchSize > 100000)
System.err.printf("WARNING: You have defined a schema that permits very large batches (%.0f max rows (>100K)). This may OOM this stress client, or the server.%n",
selectchance.get().max() * partitions.get().maxValue() * generator.maxRowCount);
JavaDriverClient client = settings.getJavaDriverClient(keyspaceName);
String query = sb.toString();
insertStatement = client.prepare(query);
System.out.println("Insert Statement:");
System.out.println(" " + query);
}
}
}
return new SchemaInsert(timer, settings, generator, seedManager, partitions.get(), selectchance.get(), rowPopulation.get(), insertStatement, settings.command.consistencyLevel, batchType);
}