in src/main/java/org/apache/accumulo/testing/ingest/TestIngest.java [197:331]
public static void ingest(AccumuloClient client, FileSystem fs, Opts opts, Configuration conf)
throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException,
TableExistsException {
long stopTime;
byte[][] bytevals = generateValues(opts.dataSize);
byte[] randomValue = new byte[opts.dataSize];
Random random = new Random();
long bytesWritten = 0;
createTable(client, opts);
BatchWriter bw = null;
RFileWriter writer = null;
if (opts.outputFile != null) {
writer = RFile.newWriter().to(opts.outputFile + ".rf").withFileSystem(fs).build();
writer.startDefaultLocalityGroup();
} else {
bw = client.createBatchWriter(opts.tableName);
String principal = ClientProperty.AUTH_PRINCIPAL.getValue(opts.getClientProps());
client.securityOperations().changeUserAuthorizations(principal, AUTHS);
}
Text labBA = new Text(opts.columnVisibility.getExpression());
long startTime = System.currentTimeMillis();
for (int i = 0; i < opts.rows; i++) {
int rowid;
if (opts.stride > 0) {
rowid = ((i % opts.stride) * (opts.rows / opts.stride)) + (i / opts.stride);
} else {
rowid = i;
}
Text row = generateRow(rowid, opts.startRow);
Mutation m = new Mutation(row);
for (int j = 0; j < opts.cols; j++) {
Text colf = new Text(opts.columnFamily);
Text colq = new Text(FastFormat.toZeroPaddedString(j, 7, 10, COL_PREFIX));
if (writer != null) {
Key key = new Key(row, colf, colq, labBA);
if (opts.timestamp >= 0) {
key.setTimestamp(opts.timestamp);
} else {
key.setTimestamp(startTime);
}
key.setDeleted(opts.delete);
bytesWritten += key.getSize();
if (opts.delete) {
writer.append(key, new Value(new byte[0]));
} else {
byte[] value;
if (opts.random != null) {
value = genRandomValue(random, randomValue, opts.random.intValue(),
rowid + opts.startRow, j);
} else {
value = bytevals[j % bytevals.length];
}
Value v = new Value(value);
writer.append(key, v);
bytesWritten += v.getSize();
}
} else {
Key key = new Key(row, colf, colq, labBA);
bytesWritten += key.getSize();
if (opts.delete) {
if (opts.timestamp >= 0)
m.putDelete(colf, colq, opts.columnVisibility, opts.timestamp);
else
m.putDelete(colf, colq, opts.columnVisibility);
} else {
byte[] value;
if (opts.random != null) {
value = genRandomValue(random, randomValue, opts.random.intValue(),
rowid + opts.startRow, j);
} else {
value = bytevals[j % bytevals.length];
}
bytesWritten += value.length;
if (opts.timestamp >= 0) {
m.put(colf, colq, opts.columnVisibility, opts.timestamp, new Value(value, true));
} else {
m.put(colf, colq, opts.columnVisibility, new Value(value, true));
}
}
}
}
if (bw != null)
bw.addMutation(m);
}
if (writer != null) {
writer.close();
} else if (bw != null) {
try {
bw.close();
} catch (MutationsRejectedException e) {
if (e.getSecurityErrorCodes().size() > 0) {
for (Entry<TabletId,Set<SecurityErrorCode>> entry : e.getSecurityErrorCodes()
.entrySet()) {
System.err.println("ERROR : Not authorized to write to : " + entry.getKey() + " due to "
+ entry.getValue());
}
}
if (e.getConstraintViolationSummaries().size() > 0) {
for (ConstraintViolationSummary cvs : e.getConstraintViolationSummaries()) {
System.err.println("ERROR : Constraint violates : " + cvs);
}
}
throw e;
}
}
stopTime = System.currentTimeMillis();
int totalValues = opts.rows * opts.cols;
double elapsed = (stopTime - startTime) / 1000.0;
System.out.printf(
"%,12d records written | %,8d records/sec | %,12d bytes written | %,8d bytes/sec | %6.3f secs %n",
totalValues, (int) (totalValues / elapsed), bytesWritten, (int) (bytesWritten / elapsed),
elapsed);
}