in src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Init.java [43:97]
public void visit(State state, RandWalkEnv env, Properties props) throws Exception {
int numBanks = state.getInteger("numBanks");
int numAccts = state.getInteger("numAccts");
// add some splits to spread ingest out a little
TreeSet<Text> splits = IntStream.range(1, 10).map(i -> (int) (numBanks * .1 * i))
.mapToObj(Utils::getBank).map(Text::new).collect(Collectors.toCollection(TreeSet::new));
env.getAccumuloClient().tableOperations().addSplits(state.getString("tableName"), splits);
log.info("Added splits " + splits);
List<Integer> banks = IntStream.range(0, numBanks).boxed().collect(Collectors.toList());
// shuffle for case when multiple threads are adding banks
Collections.shuffle(banks, state.getRandom());
ConditionalWriter cw = (ConditionalWriter) state.get("cw");
for (int i : banks) {
ConditionalMutation m = new ConditionalMutation(Utils.getBank(i));
int acceptedCount = 0;
for (int j = 0; j < numAccts; j++) {
String cf = Utils.getAccount(j);
m.addCondition(new Condition(cf, "seq"));
m.put(cf, "bal", "100");
m.put(cf, "seq", Utils.getSeq(0));
if (j % 1000 == 0 && j > 0) {
Status status = cw.write(m).getStatus();
while (status == Status.UNKNOWN)
status = cw.write(m).getStatus();
if (status == Status.ACCEPTED)
acceptedCount++;
m = new ConditionalMutation(Utils.getBank(i));
}
}
if (m.getConditions().size() > 0) {
Status status = cw.write(m).getStatus();
while (status == Status.UNKNOWN)
status = cw.write(m).getStatus();
if (status == Status.ACCEPTED)
acceptedCount++;
}
log.trace("Added bank " + Utils.getBank(i) + " " + acceptedCount);
}
log.debug("Added " + numBanks + " banks");
}