private void flush()

in server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java [293:446]


  private void flush(UpdateSession us) {

    int mutationCount = 0;
    Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
    Map<CommitSession,TabletMutations> loggables = new HashMap<>();
    Throwable error = null;

    long pt1 = System.currentTimeMillis();

    boolean containsMetadataTablet = false;
    for (Tablet tablet : us.queuedMutations.keySet()) {
      if (tablet.getExtent().isMeta()) {
        containsMetadataTablet = true;
      }
    }

    if (!containsMetadataTablet && !us.queuedMutations.isEmpty()) {
      server.resourceManager.waitUntilCommitsAreEnabled();
    }

    int preppedMutations = 0;
    int sendableMutations = 0;

    Span span = TraceUtil.startSpan(this.getClass(), "flush::prep");
    try (Scope scope = span.makeCurrent()) {
      for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {

        Tablet tablet = entry.getKey();
        Durability durability =
            DurabilityImpl.resolveDurabilty(us.durability, tablet.getDurability());
        List<Mutation> mutations = entry.getValue();
        if (!mutations.isEmpty()) {
          preppedMutations += mutations.size();
          try {
            server.updateMetrics.addMutationArraySize(mutations.size());

            PreparedMutations prepared = tablet.prepareMutationsForCommit(us.cenv, mutations);

            if (prepared.tabletClosed()) {
              if (us.currentTablet == tablet) {
                us.currentTablet = null;
              }
              us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
            } else {
              if (!prepared.getNonViolators().isEmpty()) {
                List<Mutation> validMutations = prepared.getNonViolators();
                CommitSession session = prepared.getCommitSession();
                if (durability != Durability.NONE) {
                  loggables.put(session, new TabletMutations(session, validMutations, durability));
                }
                sendables.put(session, validMutations);
                sendableMutations += validMutations.size();
              }

              if (!prepared.getViolations().isEmpty()) {
                us.violations.add(prepared.getViolations());
                server.updateMetrics.addConstraintViolations(1);
              }
              // Use the size of the original mutation list, regardless of how many mutations
              // did not violate constraints.
              mutationCount += mutations.size();

            }
          } catch (Exception t) {
            error = t;
            log.error("Unexpected error preparing for commit", error);
            TraceUtil.setException(span, t, false);
            break;
          }
        }
      }
    } catch (Exception e) {
      TraceUtil.setException(span, e, true);
      throw e;
    } finally {
      span.end();
    }

    long pt2 = System.currentTimeMillis();
    us.prepareTimes.addStat(pt2 - pt1);
    updateAvgPrepTime(pt2 - pt1, preppedMutations);

    if (error != null) {
      sendables.forEach((commitSession, value) -> commitSession.abortCommit());
      throw new RuntimeException(error);
    }
    try {
      Span span2 = TraceUtil.startSpan(this.getClass(), "flush::wal");
      try (Scope scope = span2.makeCurrent()) {
        while (true) {
          try {
            long t1 = System.currentTimeMillis();

            server.logger.logManyTablets(loggables);

            long t2 = System.currentTimeMillis();
            us.walogTimes.addStat(t2 - t1);
            updateWalogWriteTime((t2 - t1));
            break;
          } catch (IOException | FSError ex) {
            log.warn("logging mutations failed, retrying");
          } catch (Exception t) {
            log.error("Unknown exception logging mutations, counts"
                + " for mutations in flight not decremented!", t);
            throw new RuntimeException(t);
          }
        }
      } catch (Exception e) {
        TraceUtil.setException(span2, e, true);
        log.error("Error logging mutations sent from {}", TServerUtils.clientAddress.get(), e);
        throw e;
      } finally {
        span2.end();
      }

      Span span3 = TraceUtil.startSpan(this.getClass(), "flush::commit");
      try (Scope scope = span3.makeCurrent()) {
        long t1 = System.currentTimeMillis();
        sendables.forEach((commitSession, mutations) -> {
          commitSession.commit(mutations);
          KeyExtent extent = commitSession.getExtent();

          if (us.currentTablet != null && extent == us.currentTablet.getExtent()) {
            // because constraint violations may filter out some
            // mutations, for proper accounting with the client code,
            // need to increment the count based on the original
            // number of mutations from the client NOT the filtered number
            us.successfulCommits.increment(us.currentTablet,
                us.queuedMutations.get(us.currentTablet).size());
          }
        });
        long t2 = System.currentTimeMillis();

        us.flushTime += (t2 - pt1);
        us.commitTimes.addStat(t2 - t1);

        updateAvgCommitTime(t2 - t1, sendableMutations);
      } catch (Exception e) {
        TraceUtil.setException(span3, e, true);
        log.error("Error committing mutations sent from {}", TServerUtils.clientAddress.get(), e);
        throw e;
      } finally {
        span3.end();
      }
    } finally {
      us.queuedMutations.clear();
      if (us.currentTablet != null) {
        us.queuedMutations.put(us.currentTablet, new ArrayList<>());
      }
      server.updateTotalQueuedMutationSize(-us.queuedMutationSize);
      us.queuedMutationSize = 0;
    }
    us.totalUpdates += mutationCount;
  }