in src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java [281:409]
protected static void doIngest(AccumuloClient client, RandomGeneratorFactory randomFactory,
BatchWriterFactory batchWriterFactory, String tableName, Properties testProps, int maxColF,
int maxColQ, long numEntries, boolean checksum, Random random)
throws TableNotFoundException, MutationsRejectedException, InterruptedException {
if (!client.tableOperations().exists(tableName)) {
throw new TableNotFoundException(null, tableName,
"Consult the README and create the table before starting ingest.");
}
byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
log.info("Ingest instance ID: {} current time: {}ms", new String(ingestInstanceId, UTF_8),
System.currentTimeMillis());
long entriesWritten = 0L;
long entriesDeleted = 0L;
final int flushInterval = getFlushEntries(testProps);
log.info("A flush will occur after every {} entries written", flushInterval);
final int maxDepth = 25;
// always want to point back to flushed data. This way the previous item should
// always exist in accumulo when verifying data. To do this make insert N point
// back to the row from insert (N - flushInterval). The array below is used to keep
// track of all inserts.
MutationInfo[][] nodeMap = new MutationInfo[maxDepth][flushInterval];
long lastFlushTime = System.currentTimeMillis();
log.info("Total entries to be written: {}", numEntries);
visibilities = parseVisibilities(testProps.getProperty(TestProps.CI_INGEST_VISIBILITIES));
pauseEnabled = pauseEnabled(testProps);
pauseMin = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MIN));
pauseMax = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MAX));
Preconditions.checkState(0 < pauseMin && pauseMin <= pauseMax,
"Bad pause wait min/max, must conform to: 0 < min <= max");
if (pauseEnabled) {
lastPauseNs = System.nanoTime();
pauseWaitSec = getPause(random);
log.info("PAUSING enabled");
log.info("INGESTING for {}s", pauseWaitSec);
}
final float deleteProbability = getDeleteProbability(testProps);
log.info("DELETES will occur with a probability of {}",
String.format("%.02f", deleteProbability));
try (BatchWriter bw = batchWriterFactory.create(tableName)) {
zipfianEnabled =
Boolean.parseBoolean(testProps.getProperty("test.ci.ingest.zipfian.enabled"));
if (zipfianEnabled) {
minSize = Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.min.size"));
maxSize = Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.max.size"));
exponent = Double.parseDouble(testProps.getProperty("test.ci.ingest.zipfian.exponent"));
rnd = new RandomDataGenerator();
log.info("Zipfian distribution enabled with min size: {}, max size: {}, exponent: {}",
minSize, maxSize, exponent);
}
out: while (true) {
ColumnVisibility cv = getVisibility(random);
// generate sets nodes that link to previous set of nodes
for (int depth = 0; depth < maxDepth; depth++) {
// use the same random generator for each flush interval
LongSupplier randomRowGenerator = randomFactory.get();
for (int index = 0; index < flushInterval; index++) {
long rowLong = randomRowGenerator.getAsLong();
byte[] prevRow = depth == 0 ? null : genRow(nodeMap[depth - 1][index].row);
int cfInt = random.nextInt(maxColF);
int cqInt = random.nextInt(maxColQ);
nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt);
Mutation m = genMutation(rowLong, cfInt, cqInt, cv, ingestInstanceId, entriesWritten,
prevRow, checksum);
entriesWritten++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
if (entriesWritten >= numEntries)
break out;
pauseCheck(random);
}
// random chance that the entries will be deleted
final boolean delete = random.nextFloat() < deleteProbability;
// if the previously written entries are scheduled to be deleted
if (delete) {
log.info("Deleting last portion of written entries");
// add delete mutations in the reverse order in which they were written
for (int depth = nodeMap.length - 1; depth >= 0; depth--) {
for (int index = nodeMap[depth].length - 1; index >= 0; index--) {
MutationInfo currentNode = nodeMap[depth][index];
Mutation m = new Mutation(genRow(currentNode.row));
m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq));
entriesDeleted++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
pauseCheck(random);
}
} else {
// create one big linked list, this makes all the first inserts point to something
for (int index = 0; index < flushInterval - 1; index++) {
MutationInfo firstEntry = nodeMap[0][index];
MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1];
Mutation m = genMutation(firstEntry.row, firstEntry.cf, firstEntry.cq, cv,
ingestInstanceId, entriesWritten, genRow(lastEntry.row), checksum);
entriesWritten++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
}
if (entriesWritten >= numEntries)
break out;
pauseCheck(random);
}
}
}