in server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java [293:446]
private void flush(UpdateSession us) {
int mutationCount = 0;
Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
Map<CommitSession,TabletMutations> loggables = new HashMap<>();
Throwable error = null;
long pt1 = System.currentTimeMillis();
boolean containsMetadataTablet = false;
for (Tablet tablet : us.queuedMutations.keySet()) {
if (tablet.getExtent().isMeta()) {
containsMetadataTablet = true;
}
}
if (!containsMetadataTablet && !us.queuedMutations.isEmpty()) {
server.resourceManager.waitUntilCommitsAreEnabled();
}
int preppedMutations = 0;
int sendableMutations = 0;
Span span = TraceUtil.startSpan(this.getClass(), "flush::prep");
try (Scope scope = span.makeCurrent()) {
for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
Tablet tablet = entry.getKey();
Durability durability =
DurabilityImpl.resolveDurabilty(us.durability, tablet.getDurability());
List<Mutation> mutations = entry.getValue();
if (!mutations.isEmpty()) {
preppedMutations += mutations.size();
try {
server.updateMetrics.addMutationArraySize(mutations.size());
PreparedMutations prepared = tablet.prepareMutationsForCommit(us.cenv, mutations);
if (prepared.tabletClosed()) {
if (us.currentTablet == tablet) {
us.currentTablet = null;
}
us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
} else {
if (!prepared.getNonViolators().isEmpty()) {
List<Mutation> validMutations = prepared.getNonViolators();
CommitSession session = prepared.getCommitSession();
if (durability != Durability.NONE) {
loggables.put(session, new TabletMutations(session, validMutations, durability));
}
sendables.put(session, validMutations);
sendableMutations += validMutations.size();
}
if (!prepared.getViolations().isEmpty()) {
us.violations.add(prepared.getViolations());
server.updateMetrics.addConstraintViolations(1);
}
// Use the size of the original mutation list, regardless of how many mutations
// did not violate constraints.
mutationCount += mutations.size();
}
} catch (Exception t) {
error = t;
log.error("Unexpected error preparing for commit", error);
TraceUtil.setException(span, t, false);
break;
}
}
}
} catch (Exception e) {
TraceUtil.setException(span, e, true);
throw e;
} finally {
span.end();
}
long pt2 = System.currentTimeMillis();
us.prepareTimes.addStat(pt2 - pt1);
updateAvgPrepTime(pt2 - pt1, preppedMutations);
if (error != null) {
sendables.forEach((commitSession, value) -> commitSession.abortCommit());
throw new RuntimeException(error);
}
try {
Span span2 = TraceUtil.startSpan(this.getClass(), "flush::wal");
try (Scope scope = span2.makeCurrent()) {
while (true) {
try {
long t1 = System.currentTimeMillis();
server.logger.logManyTablets(loggables);
long t2 = System.currentTimeMillis();
us.walogTimes.addStat(t2 - t1);
updateWalogWriteTime((t2 - t1));
break;
} catch (IOException | FSError ex) {
log.warn("logging mutations failed, retrying");
} catch (Exception t) {
log.error("Unknown exception logging mutations, counts"
+ " for mutations in flight not decremented!", t);
throw new RuntimeException(t);
}
}
} catch (Exception e) {
TraceUtil.setException(span2, e, true);
log.error("Error logging mutations sent from {}", TServerUtils.clientAddress.get(), e);
throw e;
} finally {
span2.end();
}
Span span3 = TraceUtil.startSpan(this.getClass(), "flush::commit");
try (Scope scope = span3.makeCurrent()) {
long t1 = System.currentTimeMillis();
sendables.forEach((commitSession, mutations) -> {
commitSession.commit(mutations);
KeyExtent extent = commitSession.getExtent();
if (us.currentTablet != null && extent == us.currentTablet.getExtent()) {
// because constraint violations may filter out some
// mutations, for proper accounting with the client code,
// need to increment the count based on the original
// number of mutations from the client NOT the filtered number
us.successfulCommits.increment(us.currentTablet,
us.queuedMutations.get(us.currentTablet).size());
}
});
long t2 = System.currentTimeMillis();
us.flushTime += (t2 - pt1);
us.commitTimes.addStat(t2 - t1);
updateAvgCommitTime(t2 - t1, sendableMutations);
} catch (Exception e) {
TraceUtil.setException(span3, e, true);
log.error("Error committing mutations sent from {}", TServerUtils.clientAddress.get(), e);
throw e;
} finally {
span3.end();
}
} finally {
us.queuedMutations.clear();
if (us.currentTablet != null) {
us.queuedMutations.put(us.currentTablet, new ArrayList<>());
}
server.updateTotalQueuedMutationSize(-us.queuedMutationSize);
us.queuedMutationSize = 0;
}
us.totalUpdates += mutationCount;
}