public void putSplits()

in core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java [495:637]


  public void putSplits(String tableName, SortedMap<Text,TabletMergeability> splits)
      throws AccumuloException, TableNotFoundException, AccumuloSecurityException {

    EXISTING_TABLE_NAME.validate(tableName);

    TableId tableId = context.getTableId(tableName);

    // TODO should there be a server side check for this?
    context.requireNotOffline(tableId, tableName);

    ClientTabletCache tabLocator = context.getTabletLocationCache(tableId);

    SortedMap<Text,TabletMergeability> splitsTodo =
        Collections.synchronizedSortedMap(new TreeMap<>(splits));

    final ByteBuffer EMPTY = ByteBuffer.allocate(0);

    ExecutorService startExecutor =
        context.threadPools().getPoolBuilder(SPLIT_START_POOL).numCoreThreads(16).build();
    ExecutorService waitExecutor =
        context.threadPools().getPoolBuilder(SPLIT_WAIT_POOL).numCoreThreads(16).build();

    while (!splitsTodo.isEmpty()) {

      tabLocator.invalidateCache();

      var splitsToTablets = mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
      Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits = splitsToTablets.newSplits;
      Map<KeyExtent,TabletMergeability> existingSplits = splitsToTablets.existingSplits;

      List<CompletableFuture<Void>> futures = new ArrayList<>();

      // Handle existing updates
      if (!existingSplits.isEmpty()) {
        futures.add(CompletableFuture.supplyAsync(() -> {
          try {
            var tSplits = existingSplits.entrySet().stream().collect(Collectors.toMap(
                e -> e.getKey().toThrift(), e -> TabletMergeabilityUtil.toThrift(e.getValue())));
            return ThriftClientTypes.MANAGER.executeTableCommand(context,
                client -> client.updateTabletMergeability(TraceUtil.traceInfo(), context.rpcCreds(),
                    tableName, tSplits));
          } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
            // This exception type is used because it makes it easier in the foreground thread to do
            // exception analysis when using CompletableFuture.
            throw new CompletionException(e);
          }
        }, startExecutor).thenApplyAsync(updated -> {
          // Remove the successfully updated tablets from the list, failures will be retried
          updated.forEach(tke -> splitsTodo.remove(KeyExtent.fromThrift(tke).endRow()));
          return null;
        }, waitExecutor));
      }

      // begin the fate operation for each tablet without waiting for the operation to complete
      for (Entry<KeyExtent,List<Pair<Text,TabletMergeability>>> splitsForTablet : tabletSplits
          .entrySet()) {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
          var extent = splitsForTablet.getKey();

          List<ByteBuffer> args = new ArrayList<>();
          args.add(ByteBuffer.wrap(extent.tableId().canonical().getBytes(UTF_8)));
          args.add(extent.endRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.endRow()));
          args.add(
              extent.prevEndRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.prevEndRow()));

          splitsForTablet.getValue()
              .forEach(split -> args.add(TabletMergeabilityUtil.encodeAsBuffer(split)));

          try {
            return handleFateOperation(() -> {
              TFateInstanceType t = FateInstanceType.fromNamespaceOrTableName(tableName).toThrift();
              TFateId opid = beginFateOperation(t);
              executeFateOperation(opid, TFateOperation.TABLE_SPLIT, args, Map.of(), false);
              return new Pair<>(opid, splitsForTablet.getValue());
            }, tableName);
          } catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException
              | AccumuloSecurityException | TableNotFoundException | AccumuloException e) {
            // This exception type is used because it makes it easier in the foreground thread to do
            // exception analysis when using CompletableFuture.
            throw new CompletionException(e);
          }
          // wait for the fate operation to complete in a separate thread pool
        }, startExecutor).thenApplyAsync(pair -> {
          final TFateId opid = pair.getFirst();
          final List<Pair<Text,TabletMergeability>> completedSplits = pair.getSecond();

          try {
            String status = handleFateOperation(() -> waitForFateOperation(opid), tableName);

            if (SPLIT_SUCCESS_MSG.equals(status)) {
              completedSplits.stream().map(Pair::getFirst).forEach(splitsTodo::remove);
            }
          } catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException
              | AccumuloSecurityException | TableNotFoundException | AccumuloException e) {
            // This exception type is used because it makes it easier in the foreground thread to do
            // exception analysis when using CompletableFuture.
            throw new CompletionException(e);
          } finally {
            // always finish table op, even when exception
            if (opid != null) {
              try {
                finishFateOperation(opid);
              } catch (Exception e) {
                log.warn("Exception thrown while finishing fate table operation", e);
              }
            }
          }
          return null;
        }, waitExecutor);
        futures.add(future);
      }

      try {
        futures.forEach(CompletableFuture::join);
      } catch (CompletionException ee) {
        Throwable excep = ee.getCause();
        // Below all exceptions are wrapped and rethrown. This is done so that the user knows
        // what code path got them here. If the wrapping was not done, the user would only
        // have the stack trace for the background thread.
        if (excep instanceof TableNotFoundException) {
          TableNotFoundException tnfe = (TableNotFoundException) excep;
          throw new TableNotFoundException(tableId.canonical(), tableName,
              "Table not found by background thread", tnfe);
        } else if (excep instanceof TableOfflineException) {
          log.debug("TableOfflineException occurred in background thread. Throwing new exception",
              excep);
          throw new TableOfflineException(tableId, tableName);
        } else if (excep instanceof AccumuloSecurityException) {
          // base == background accumulo security exception
          AccumuloSecurityException base = (AccumuloSecurityException) excep;
          throw new AccumuloSecurityException(base.getUser(), base.asThriftException().getCode(),
              base.getTableInfo(), excep);
        } else if (excep instanceof AccumuloServerException) {
          throw new AccumuloServerException((AccumuloServerException) excep);
        } else {
          throw new AccumuloException(excep);
        }
      }

    }
    startExecutor.shutdown();
    waitExecutor.shutdown();
  }