in accord-core/src/main/java/accord/impl/InMemoryCommandStore.java [724:854]
public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue)
{
accumulate = commandStore.mapReduceForKey(this, keysOrRanges, slice, (forKey, prev) -> {
CommandTimeseries<?> timeseries;
switch (testTimestamp)
{
default: throw new AssertionError();
case STARTED_AFTER:
case STARTED_BEFORE:
timeseries = forKey.byId();
break;
case EXECUTES_AFTER:
case MAY_EXECUTE_BEFORE:
timeseries = forKey.byExecuteAt();
}
CommandTimeseries.TestTimestamp remapTestTimestamp;
switch (testTimestamp)
{
default: throw new AssertionError();
case STARTED_AFTER:
case EXECUTES_AFTER:
remapTestTimestamp = CommandTimeseries.TestTimestamp.AFTER;
break;
case STARTED_BEFORE:
case MAY_EXECUTE_BEFORE:
remapTestTimestamp = CommandTimeseries.TestTimestamp.BEFORE;
}
return timeseries.mapReduce(testKind, remapTestTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue);
}, accumulate, terminalValue);
if (accumulate.equals(terminalValue))
return accumulate;
// TODO (find lib, efficiency): this is super inefficient, need to store Command in something queryable
Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
Map<Range, List<Map.Entry<TxnId, Timestamp>>> collect = new TreeMap<>(Range::compare);
commandStore.rangeCommands.forEach(((txnId, rangeCommand) -> {
Command command = rangeCommand.command.value();
Invariants.nonNull(command);
switch (testTimestamp)
{
default: throw new AssertionError();
case STARTED_AFTER:
if (command.txnId().compareTo(timestamp) < 0) return;
else break;
case STARTED_BEFORE:
if (command.txnId().compareTo(timestamp) > 0) return;
else break;
case EXECUTES_AFTER:
if (command.executeAt().compareTo(timestamp) < 0) return;
else break;
case MAY_EXECUTE_BEFORE:
Timestamp compareTo = command.known().executeAt.hasDecidedExecuteAt() ? command.executeAt() : command.txnId();
if (compareTo.compareTo(timestamp) > 0) return;
else break;
}
if (minStatus != null && command.status().compareTo(minStatus) < 0)
return;
if (maxStatus != null && command.status().compareTo(maxStatus) > 0)
return;
if (!testKind.test(command.txnId().rw()))
return;
if (testDep != ANY_DEPS)
{
if (!command.known().deps.hasProposedOrDecidedDeps())
return;
if ((testDep == WITH) == !command.partialDeps().contains(depId))
return;
}
if (!rangeCommand.ranges.intersects(sliced))
return;
Routables.foldl(rangeCommand.ranges, sliced, (r, in, i) -> {
// TODO (easy, efficiency): pass command as a parameter to Fold
List<Map.Entry<TxnId, Timestamp>> list = in.computeIfAbsent(r, ignore -> new ArrayList<>());
if (list.isEmpty() || !list.get(list.size() - 1).getKey().equals(command.txnId()))
list.add(new AbstractMap.SimpleImmutableEntry<>(command.txnId(), command.executeAt()));
return in;
}, collect);
}));
if (minStatus == null && testDep == ANY_DEPS)
{
commandStore.historicalRangeCommands.forEach(((txnId, ranges) -> {
switch (testTimestamp)
{
default: throw new AssertionError();
case STARTED_AFTER:
case EXECUTES_AFTER:
if (txnId.compareTo(timestamp) < 0) return;
else break;
case STARTED_BEFORE:
case MAY_EXECUTE_BEFORE:
if (txnId.compareTo(timestamp) > 0) return;
else break;
}
if (!testKind.test(txnId.rw()))
return;
if (!ranges.intersects(sliced))
return;
Routables.foldl(ranges, sliced, (r, in, i) -> {
// TODO (easy, efficiency): pass command as a parameter to Fold
List<Map.Entry<TxnId, Timestamp>> list = in.computeIfAbsent(r, ignore -> new ArrayList<>());
if (list.isEmpty() || !list.get(list.size() - 1).getKey().equals(txnId))
list.add(new AbstractMap.SimpleImmutableEntry<>(txnId, txnId));
return in;
}, collect);
}));
}
for (Map.Entry<Range, List<Map.Entry<TxnId, Timestamp>>> e : collect.entrySet())
{
for (Map.Entry<TxnId, Timestamp> command : e.getValue())
{
T initial = accumulate;
accumulate = map.apply(e.getKey(), command.getKey(), command.getValue(), initial);
}
}
return accumulate;
}