in src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Transfer.java [66:137]
public void visit(State state, RandWalkEnv env, Properties props) throws Exception {
String table = state.getString("tableName");
Random rand = state.getRandom();
AccumuloClient client = env.getAccumuloClient();
int numAccts = state.getInteger("numAccts");
// note: non integer exponents are slow
ZipfDistribution zdiBanks = new ZipfDistribution(state.getInteger("numBanks"), 1);
String bank = Utils.getBank(zdiBanks.inverseCumulativeProbability(rand.nextDouble()));
ZipfDistribution zdiAccts = new ZipfDistribution(numAccts, 1);
String acct1 = Utils.getAccount(zdiAccts.inverseCumulativeProbability(rand.nextDouble()));
String acct2 = Utils.getAccount(zdiAccts.inverseCumulativeProbability(rand.nextDouble()));
while (acct2.equals(acct1)) {
// intentionally not using zipf distribution to pick on retry
acct2 = Utils.getAccount(rand.nextInt(numAccts));
}
// TODO document how data should be read when using ConditionalWriter
try (Scanner scanner = new IsolatedScanner(client.createScanner(table, Authorizations.EMPTY))) {
scanner.setRange(new Range(bank));
scanner.fetchColumnFamily(new Text(acct1));
scanner.fetchColumnFamily(new Text(acct2));
Account a1 = new Account();
Account a2 = new Account();
Account a;
for (Entry<Key,Value> entry : scanner) {
String cf = entry.getKey().getColumnFamilyData().toString();
String cq = entry.getKey().getColumnQualifierData().toString();
if (cf.equals(acct1))
a = a1;
else if (cf.equals(acct2))
a = a2;
else
throw new Exception("Unexpected column fam: " + cf);
if (cq.equals("bal"))
a.setBal(entry.getValue().toString());
else if (cq.equals("seq"))
a.setSeq(entry.getValue().toString());
else
throw new Exception("Unexpected column qual: " + cq);
}
int amt = rand.nextInt(50);
log.debug(
"transfer req " + bank + " " + amt + " " + acct1 + " " + a1 + " " + acct2 + " " + a2);
if (a1.bal >= amt) {
ConditionalMutation cm = new ConditionalMutation(bank,
new Condition(acct1, "seq").setValue(Utils.getSeq(a1.seq)),
new Condition(acct2, "seq").setValue(Utils.getSeq(a2.seq)));
cm.put(acct1, "bal", (a1.bal - amt) + "");
cm.put(acct2, "bal", (a2.bal + amt) + "");
cm.put(acct1, "seq", Utils.getSeq(a1.seq + 1));
cm.put(acct2, "seq", Utils.getSeq(a2.seq + 1));
ConditionalWriter cw = (ConditionalWriter) state.get("cw");
Status status = cw.write(cm).getStatus();
while (status == Status.UNKNOWN) {
log.debug("retrying transfer " + status);
status = cw.write(cm).getStatus();
}
log.debug("transfer result " + bank + " " + status + " " + a1 + " " + a2);
}
}
}