in src/main/java/org/apache/accumulo/testing/scalability/Ingest.java [73:134]
public void client() {
AccumuloClient client = getClient();
String tableName = getTestProperty("TABLE");
// get batch writer configuration
long maxMemory = Long.parseLong(getTestProperty("MAX_MEMORY"));
long maxLatency = Long.parseLong(getTestProperty("MAX_LATENCY"));
int maxWriteThreads = Integer.parseInt(getTestProperty("NUM_THREADS"));
// create batch writer
BatchWriter bw = null;
try {
bw = client.createBatchWriter(tableName, new BatchWriterConfig().setMaxMemory(maxMemory)
.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
} catch (TableNotFoundException e) {
log.error("Table '" + tableName + "' not found.", e);
System.exit(-1);
}
// configure writing
Random r = new Random();
String ingestInstanceId = UUID.randomUUID().toString();
long numIngestEntries = Long.parseLong(getTestProperty("NUM_ENTRIES"));
long minRow = 0L;
long maxRow = 9223372036854775807L;
int maxColF = 32767;
int maxColQ = 32767;
long count = 0;
long totalBytes = 0;
ColumnVisibility cv = new ColumnVisibility();
// start timer
startTimer();
// write specified number of entries
while (count < numIngestEntries) {
count++;
long rowId = ContinuousIngest.genLong(minRow, maxRow, r);
Mutation m = ContinuousIngest.genMutation(rowId, r.nextInt(maxColF), r.nextInt(maxColQ), cv,
ingestInstanceId.getBytes(UTF_8), count, null, false);
totalBytes += m.numBytes();
try {
bw.addMutation(m);
} catch (MutationsRejectedException e) {
log.error("Mutations rejected.", e);
System.exit(-1);
}
}
// close writer
try {
bw.close();
} catch (MutationsRejectedException e) {
log.error("Could not close BatchWriter due to mutations being rejected.", e);
System.exit(-1);
}
// stop timer
stopTimer(count, totalBytes);
}