in src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java [103:199]
public RecordReader<Key,Value> createRecordReader(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) {
return new RecordReader<Key,Value>() {
long numNodes;
long nodeCount;
private Random random;
private byte[] uuid;
long minRow;
long maxRow;
int maxFam;
int maxQual;
List<ColumnVisibility> visibilities;
boolean checksum;
Key prevKey;
Key currKey;
Value currValue;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext job) {
numNodes = job.getConfiguration().getLong(PROP_MAP_NODES, 1000000);
uuid = job.getConfiguration().get(PROP_UUID).getBytes(StandardCharsets.UTF_8);
minRow = job.getConfiguration().getLong(PROP_ROW_MIN, 0);
maxRow = job.getConfiguration().getLong(PROP_ROW_MAX, Long.MAX_VALUE);
maxFam = job.getConfiguration().getInt(PROP_FAM_MAX, Short.MAX_VALUE);
maxQual = job.getConfiguration().getInt(PROP_QUAL_MAX, Short.MAX_VALUE);
checksum = job.getConfiguration().getBoolean(PROP_CHECKSUM, false);
visibilities = ContinuousIngest.parseVisibilities(job.getConfiguration().get(PROP_VIS));
random = new Random(new SecureRandom().nextLong());
nodeCount = 0;
}
private Key genKey(CRC32 cksum) {
byte[] row = genRow(genLong(minRow, maxRow, random));
byte[] fam = genCol(random.nextInt(maxFam));
byte[] qual = genCol(random.nextInt(maxQual));
byte[] cv = visibilities.get(random.nextInt(visibilities.size())).flatten();
if (cksum != null) {
cksum.update(row);
cksum.update(fam);
cksum.update(qual);
cksum.update(cv);
}
return new Key(row, fam, qual, cv);
}
private byte[] createValue(byte[] ingestInstanceId, byte[] prevRow, Checksum cksum) {
return ContinuousIngest.createValue(ingestInstanceId, nodeCount, prevRow, cksum);
}
@Override
public boolean nextKeyValue() {
if (nodeCount < numNodes) {
CRC32 cksum = checksum ? new CRC32() : null;
prevKey = currKey;
byte[] prevRow = prevKey != null ? prevKey.getRowData().toArray() : null;
currKey = genKey(cksum);
currValue = new Value(createValue(uuid, prevRow, cksum));
nodeCount++;
return true;
} else {
return false;
}
}
@Override
public Key getCurrentKey() {
return currKey;
}
@Override
public Value getCurrentValue() {
return currValue;
}
@Override
public float getProgress() {
return nodeCount * 1.0f / numNodes;
}
@Override
public void close() throws IOException {
}
};
}