in accord-core/src/main/java/accord/local/durability/DurabilityQueue.java [243:310]
private void start(SyncPoint<Range> exclusiveSyncPoint, @Nullable DurabilityRequest request, int attempt, AgentExecutor executor)
{
logger.debug("{}: Awaiting durability for {}", exclusiveSyncPoint.syncId, exclusiveSyncPoint.route.toRanges());
AsyncResult<DurabilityResult> coordinate = coordinateIncluding(node, exclusiveSyncPoint, request == null ? null : request.including, executor, attempt);
registerInProgress(exclusiveSyncPoint, coordinate);
if (request != null)
request.reportAttempt(exclusiveSyncPoint.syncId, node.elapsed(MICROSECONDS), coordinate);
coordinate.invoke((success, fail) -> {
synchronized (this)
{
unregisterInProgress(exclusiveSyncPoint, coordinate);
TxnId txnId = exclusiveSyncPoint.syncId;
Ranges ranges = exclusiveSyncPoint.route.toRanges();
String requestor = request != null ? " requested by " + request.requestedBy : "";
if (fail != null)
{
if (logger.isTraceEnabled()) logger.trace("{}: failed awaiting durability for {}{}.", txnId, ranges, requestor, fail);
if (fail instanceof SyncPointErased || fail instanceof TopologyManager.TopologyRetiredException)
{
// we can't succeed. if this was requested, and the request is still waiting, submit another coordination request
// TODO (required): add back-off and expand this to all unknown exception outcomes
if (request != null)
node.durability().shards().request(request, request.stillWaiting(exclusiveSyncPoint.route));
success();
return;
}
}
if (success == null || (success.achievedRemote.compareTo(request == null ? All : request.remote) < 0))
{
if (success != null)
fail = success.failure;
if (fail instanceof Exhausted || success != null)
{
Collection<Node.Id> failedNodes = success != null ? success.excluding : ((Exhausted)fail).failedNodes();
Ranges failedRanges = success != null ? exclusiveSyncPoint.route().toRanges() : ((Exhausted)fail).failedRanges();
boolean log = failedNodes == null;
if (!log)
{
Set<Node.Id> unlogged = new HashSet<>(failedNodes);
for (Collection<Node.Id> logged : WARNINGS_LOGGED.tailMap(System.nanoTime() - MINUTES.toNanos(EXHAUSTED_LOG_INTERVAL_MINUTES)).values())
unlogged.removeAll(logged);
log = !unlogged.isEmpty();
}
if (log)
{
logger.info("{}: Incomplete durability for {}{}. {} were unsuccessful.", txnId, failedRanges, requestor, failedNodes == null ? "some nodes" : failedNodes);
WARNINGS_LOGGED.headMap(System.nanoTime() - MINUTES.toNanos(EXHAUSTED_LOG_INTERVAL_MINUTES)).clear();
WARNINGS_LOGGED.put(System.nanoTime(), failedNodes == null ? Collections.emptyList() : failedNodes);
}
}
else
{
if (fail instanceof Timeout) logger.info("{}: Timeout awaiting durability for {}{}", txnId, ranges, requestor, fail);
else if (fail != null) logger.info("{}: Failed awaiting durability for {}{}; will retry", txnId, ranges, requestor, fail);
}
retry(exclusiveSyncPoint, request, attempt + 1);
}
else
{
if (request != null) logger.info("{}: Successfully achieved durability for {}{}.", txnId, ranges, requestor);
else logger.debug("{}: Successfully achieved durability for {}.", txnId, ranges);
success();
}
}
});
}