in core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java [495:637]
public void putSplits(String tableName, SortedMap<Text,TabletMergeability> splits)
throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
EXISTING_TABLE_NAME.validate(tableName);
TableId tableId = context.getTableId(tableName);
// TODO should there be a server side check for this?
context.requireNotOffline(tableId, tableName);
ClientTabletCache tabLocator = context.getTabletLocationCache(tableId);
SortedMap<Text,TabletMergeability> splitsTodo =
Collections.synchronizedSortedMap(new TreeMap<>(splits));
final ByteBuffer EMPTY = ByteBuffer.allocate(0);
ExecutorService startExecutor =
context.threadPools().getPoolBuilder(SPLIT_START_POOL).numCoreThreads(16).build();
ExecutorService waitExecutor =
context.threadPools().getPoolBuilder(SPLIT_WAIT_POOL).numCoreThreads(16).build();
while (!splitsTodo.isEmpty()) {
tabLocator.invalidateCache();
var splitsToTablets = mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits = splitsToTablets.newSplits;
Map<KeyExtent,TabletMergeability> existingSplits = splitsToTablets.existingSplits;
List<CompletableFuture<Void>> futures = new ArrayList<>();
// Handle existing updates
if (!existingSplits.isEmpty()) {
futures.add(CompletableFuture.supplyAsync(() -> {
try {
var tSplits = existingSplits.entrySet().stream().collect(Collectors.toMap(
e -> e.getKey().toThrift(), e -> TabletMergeabilityUtil.toThrift(e.getValue())));
return ThriftClientTypes.MANAGER.executeTableCommand(context,
client -> client.updateTabletMergeability(TraceUtil.traceInfo(), context.rpcCreds(),
tableName, tSplits));
} catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
// This exception type is used because it makes it easier in the foreground thread to do
// exception analysis when using CompletableFuture.
throw new CompletionException(e);
}
}, startExecutor).thenApplyAsync(updated -> {
// Remove the successfully updated tablets from the list, failures will be retried
updated.forEach(tke -> splitsTodo.remove(KeyExtent.fromThrift(tke).endRow()));
return null;
}, waitExecutor));
}
// begin the fate operation for each tablet without waiting for the operation to complete
for (Entry<KeyExtent,List<Pair<Text,TabletMergeability>>> splitsForTablet : tabletSplits
.entrySet()) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
var extent = splitsForTablet.getKey();
List<ByteBuffer> args = new ArrayList<>();
args.add(ByteBuffer.wrap(extent.tableId().canonical().getBytes(UTF_8)));
args.add(extent.endRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.endRow()));
args.add(
extent.prevEndRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.prevEndRow()));
splitsForTablet.getValue()
.forEach(split -> args.add(TabletMergeabilityUtil.encodeAsBuffer(split)));
try {
return handleFateOperation(() -> {
TFateInstanceType t = FateInstanceType.fromNamespaceOrTableName(tableName).toThrift();
TFateId opid = beginFateOperation(t);
executeFateOperation(opid, TFateOperation.TABLE_SPLIT, args, Map.of(), false);
return new Pair<>(opid, splitsForTablet.getValue());
}, tableName);
} catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException
| AccumuloSecurityException | TableNotFoundException | AccumuloException e) {
// This exception type is used because it makes it easier in the foreground thread to do
// exception analysis when using CompletableFuture.
throw new CompletionException(e);
}
// wait for the fate operation to complete in a separate thread pool
}, startExecutor).thenApplyAsync(pair -> {
final TFateId opid = pair.getFirst();
final List<Pair<Text,TabletMergeability>> completedSplits = pair.getSecond();
try {
String status = handleFateOperation(() -> waitForFateOperation(opid), tableName);
if (SPLIT_SUCCESS_MSG.equals(status)) {
completedSplits.stream().map(Pair::getFirst).forEach(splitsTodo::remove);
}
} catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException
| AccumuloSecurityException | TableNotFoundException | AccumuloException e) {
// This exception type is used because it makes it easier in the foreground thread to do
// exception analysis when using CompletableFuture.
throw new CompletionException(e);
} finally {
// always finish table op, even when exception
if (opid != null) {
try {
finishFateOperation(opid);
} catch (Exception e) {
log.warn("Exception thrown while finishing fate table operation", e);
}
}
}
return null;
}, waitExecutor);
futures.add(future);
}
try {
futures.forEach(CompletableFuture::join);
} catch (CompletionException ee) {
Throwable excep = ee.getCause();
// Below all exceptions are wrapped and rethrown. This is done so that the user knows
// what code path got them here. If the wrapping was not done, the user would only
// have the stack trace for the background thread.
if (excep instanceof TableNotFoundException) {
TableNotFoundException tnfe = (TableNotFoundException) excep;
throw new TableNotFoundException(tableId.canonical(), tableName,
"Table not found by background thread", tnfe);
} else if (excep instanceof TableOfflineException) {
log.debug("TableOfflineException occurred in background thread. Throwing new exception",
excep);
throw new TableOfflineException(tableId, tableName);
} else if (excep instanceof AccumuloSecurityException) {
// base == background accumulo security exception
AccumuloSecurityException base = (AccumuloSecurityException) excep;
throw new AccumuloSecurityException(base.getUser(), base.asThriftException().getCode(),
base.getTableInfo(), excep);
} else if (excep instanceof AccumuloServerException) {
throw new AccumuloServerException((AccumuloServerException) excep);
} else {
throw new AccumuloException(excep);
}
}
}
startExecutor.shutdown();
waitExecutor.shutdown();
}