in src/main/java/org/apache/accumulo/testing/ingest/VerifyIngest.java [71:237]
private static void verifyIngest(AccumuloClient client, Opts opts)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
byte[][] bytevals = TestIngest.generateValues(opts.dataSize);
Authorizations labelAuths = new Authorizations("L1", "L2", "G1", "GROUP2");
String principal = ClientProperty.AUTH_PRINCIPAL.getValue(opts.getClientProps());
client.securityOperations().changeUserAuthorizations(principal, labelAuths);
int expectedRow = opts.startRow;
int expectedCol = 0;
int recsRead = 0;
long bytesRead = 0;
long t1 = System.currentTimeMillis();
byte[] randomValue = new byte[opts.dataSize];
Random random = new Random();
Key endKey = new Key(new Text("row_" + String.format("%010d", opts.rows + opts.startRow)));
int errors = 0;
while (expectedRow < (opts.rows + opts.startRow)) {
if (opts.useGet) {
Text rowKey = new Text("row_" + String.format("%010d", expectedRow + opts.startRow));
Text colf = new Text(opts.columnFamily);
Text colq = new Text("col_" + String.format("%07d", expectedCol));
Scanner scanner = client.createScanner("test_ingest", labelAuths);
scanner.setBatchSize(1);
Key startKey = new Key(rowKey, colf, colq);
Range range = new Range(startKey, startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL));
scanner.setRange(range);
byte[] val = null;
Iterator<Entry<Key,Value>> iter = scanner.iterator();
if (iter.hasNext()) {
val = iter.next().getValue().get();
}
byte[] ev;
if (opts.random != null) {
ev = TestIngest.genRandomValue(random, randomValue, opts.random, expectedRow,
expectedCol);
} else {
ev = bytevals[expectedCol % bytevals.length];
}
if (val == null) {
log.error("Did not find " + rowKey + " " + colf + " " + colq);
errors++;
} else {
recsRead++;
bytesRead += val.length;
Value value = new Value(val);
if (value.compareTo(ev) != 0) {
log.error("unexpected value (" + rowKey + " " + colf + " " + colq + " : saw " + value
+ " expected " + new Value(ev));
errors++;
}
}
expectedCol++;
if (expectedCol >= opts.cols) {
expectedCol = 0;
expectedRow++;
}
} else {
Key startKey = new Key(new Text("row_" + String.format("%010d", expectedRow)));
Scanner scanner = client.createScanner(opts.tableName, labelAuths);
scanner.setRange(new Range(startKey, endKey));
for (int j = 0; j < opts.cols; j++) {
scanner.fetchColumn(new Text(opts.columnFamily),
new Text("col_" + String.format("%07d", j)));
}
int recsReadBefore = recsRead;
for (Entry<Key,Value> entry : scanner) {
recsRead++;
bytesRead += entry.getKey().getLength();
bytesRead += entry.getValue().getSize();
int rowNum = getRow(entry.getKey());
int colNum = getCol(entry.getKey());
if (rowNum != expectedRow) {
log.error("rowNum != expectedRow " + rowNum + " != " + expectedRow);
errors++;
expectedRow = rowNum;
}
if (colNum != expectedCol) {
log.error(
"colNum != expectedCol " + colNum + " != " + expectedCol + " rowNum : " + rowNum);
errors++;
}
if (expectedRow >= (opts.rows + opts.startRow)) {
log.error(
"expectedRow (" + expectedRow + ") >= (ingestArgs.rows + ingestArgs.startRow) ("
+ (opts.rows + opts.startRow) + "), get batch returned data passed end key");
errors++;
break;
}
byte[] value;
if (opts.random != null) {
value =
TestIngest.genRandomValue(random, randomValue, opts.random, expectedRow, colNum);
} else {
value = bytevals[colNum % bytevals.length];
}
if (entry.getValue().compareTo(value) != 0) {
log.error("unexpected value, rowNum : " + rowNum + " colNum : " + colNum);
log.error(" saw = " + new String(entry.getValue().get()) + " expected = "
+ new String(value));
errors++;
}
if (opts.timestamp >= 0 && entry.getKey().getTimestamp() != opts.timestamp) {
log.error("unexpected timestamp " + entry.getKey().getTimestamp() + ", rowNum : "
+ rowNum + " colNum : " + colNum);
errors++;
}
expectedCol++;
if (expectedCol >= opts.cols) {
expectedCol = 0;
expectedRow++;
}
}
if (recsRead == recsReadBefore) {
log.warn("Scan returned nothing, breaking...");
break;
}
}
}
long t2 = System.currentTimeMillis();
if (errors > 0) {
throw new AccumuloException("saw " + errors + " errors ");
}
if (expectedRow != (opts.rows + opts.startRow)) {
throw new AccumuloException("Did not read expected number of rows. Saw "
+ (expectedRow - opts.startRow) + " expected " + opts.rows);
} else {
System.out.printf(
"%,12d records read | %,8d records/sec | %,12d bytes read | %,8d bytes/sec | %6.3f secs %n",
recsRead, (int) ((recsRead) / ((t2 - t1) / 1000.0)), bytesRead,
(int) (bytesRead / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0);
}
}