in src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java [101:230]
public static void main(String[] args) throws Exception {
try (ContinuousEnv env = new ContinuousEnv(args)) {
AccumuloClient client = env.getAccumuloClient();
final long rowMin = env.getRowMin();
final long rowMax = env.getRowMax();
Preconditions.checkState(0 <= rowMin && rowMin <= rowMax,
"Bad rowMin/rowMax, must conform to: 0 <= rowMin <= rowMax");
String tableName = env.getAccumuloTableName();
if (!client.tableOperations().exists(tableName)) {
throw new TableNotFoundException(null, tableName,
"Consult the README and create the table before starting ingest.");
}
byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
log.info("Ingest instance ID: {} current time: {}ms", new String(ingestInstanceId, UTF_8),
System.currentTimeMillis());
Properties testProps = env.getTestProperties();
long entriesWritten = 0L;
long entriesDeleted = 0L;
final int flushInterval = getFlushEntries(testProps);
log.info("A flush will occur after every {} entries written", flushInterval);
final int maxDepth = 25;
// always want to point back to flushed data. This way the previous item should
// always exist in accumulo when verifying data. To do this make insert N point
// back to the row from insert (N - flushInterval). The array below is used to keep
// track of all inserts.
MutationInfo[][] nodeMap = new MutationInfo[maxDepth][flushInterval];
long lastFlushTime = System.currentTimeMillis();
final int maxColF = env.getMaxColF();
final int maxColQ = env.getMaxColQ();
final boolean checksum = Boolean
.parseBoolean(testProps.getProperty(TestProps.CI_INGEST_CHECKSUM));
final long numEntries = Long
.parseLong(testProps.getProperty(TestProps.CI_INGEST_CLIENT_ENTRIES));
log.info("Total entries to be written: {}", numEntries);
visibilities = parseVisibilities(testProps.getProperty(TestProps.CI_INGEST_VISIBILITIES));
pauseEnabled = pauseEnabled(testProps);
pauseMin = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MIN));
pauseMax = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MAX));
Preconditions.checkState(0 < pauseMin && pauseMin <= pauseMax,
"Bad pause wait min/max, must conform to: 0 < min <= max");
if (pauseEnabled) {
lastPauseNs = System.nanoTime();
pauseWaitSec = getPause(env.getRandom());
log.info("PAUSING enabled");
log.info("INGESTING for {}s", pauseWaitSec);
}
final float deleteProbability = getDeleteProbability(testProps);
log.info("DELETES will occur with a probability of {}",
String.format("%.02f", deleteProbability));
try (BatchWriter bw = client.createBatchWriter(tableName)) {
out: while (true) {
ColumnVisibility cv = getVisibility(env.getRandom());
// generate sets nodes that link to previous set of nodes
for (int depth = 0; depth < maxDepth; depth++) {
for (int index = 0; index < flushInterval; index++) {
long rowLong = genLong(rowMin, rowMax, env.getRandom());
byte[] prevRow = depth == 0 ? null : genRow(nodeMap[depth - 1][index].row);
int cfInt = env.getRandom().nextInt(maxColF);
int cqInt = env.getRandom().nextInt(maxColQ);
nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt);
Mutation m = genMutation(rowLong, cfInt, cqInt, cv, ingestInstanceId, entriesWritten,
prevRow, checksum);
entriesWritten++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
if (entriesWritten >= numEntries)
break out;
pauseCheck(env.getRandom());
}
// random chance that the entries will be deleted
final boolean delete = env.getRandom().nextFloat() < deleteProbability;
// if the previously written entries are scheduled to be deleted
if (delete) {
log.info("Deleting last portion of written entries");
// add delete mutations in the reverse order in which they were written
for (int depth = nodeMap.length - 1; depth >= 0; depth--) {
for (int index = nodeMap[depth].length - 1; index >= 0; index--) {
MutationInfo currentNode = nodeMap[depth][index];
Mutation m = new Mutation(genRow(currentNode.row));
m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq));
entriesDeleted++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
pauseCheck(env.getRandom());
}
} else {
// create one big linked list, this makes all the first inserts point to something
for (int index = 0; index < flushInterval - 1; index++) {
MutationInfo firstEntry = nodeMap[0][index];
MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1];
Mutation m = genMutation(firstEntry.row, firstEntry.cf, firstEntry.cq, cv,
ingestInstanceId, entriesWritten, genRow(lastEntry.row), checksum);
entriesWritten++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
}
if (entriesWritten >= numEntries)
break out;
pauseCheck(env.getRandom());
}
}
}
}