in server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java [271:422]
public TabletServerResourceManager(ServerContext context, TabletHostingServer tserver) {
this.context = context;
final AccumuloConfiguration acuConf = context.getConfiguration();
final boolean enableMetrics = context.getMetricsInfo().isMetricsEnabled();
long maxMemory = acuConf.getAsBytes(Property.TSERV_MAXMEM);
boolean usingNativeMap = acuConf.getBoolean(Property.TSERV_NATIVEMAP_ENABLED);
if (usingNativeMap) {
NativeMapLoader.load();
}
long totalQueueSize = acuConf.getAsBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);
try {
cacheManager = BlockCacheManagerFactory.getInstance(acuConf);
} catch (ReflectiveOperationException e) {
throw new IllegalStateException("Error creating BlockCacheManager", e);
}
cacheManager.start(tserver.getBlockCacheConfiguration(acuConf));
_iCache = cacheManager.getBlockCache(CacheType.INDEX);
_dCache = cacheManager.getBlockCache(CacheType.DATA);
_sCache = cacheManager.getBlockCache(CacheType.SUMMARY);
long dCacheSize = _dCache == null ? 0 : _dCache.getMaxHeapSize();
long iCacheSize = _iCache == null ? 0 : _iCache.getMaxHeapSize();
long sCacheSize = _sCache == null ? 0 : _sCache.getMaxHeapSize();
Runtime runtime = Runtime.getRuntime();
if (usingNativeMap) {
// Still check block cache sizes when using native maps.
if (dCacheSize + iCacheSize + sCacheSize + totalQueueSize > runtime.maxMemory()) {
throw new IllegalArgumentException(String.format(
"Block cache sizes %,d and mutation queue size %,d is too large for this JVM"
+ " configuration %,d",
dCacheSize + iCacheSize + sCacheSize, totalQueueSize, runtime.maxMemory()));
}
} else if (maxMemory + dCacheSize + iCacheSize + sCacheSize + totalQueueSize
> runtime.maxMemory()) {
throw new IllegalArgumentException(String.format(
"Maximum tablet server"
+ " map memory %,d block cache sizes %,d and mutation queue size %,d is"
+ " too large for this JVM configuration %,d",
maxMemory, dCacheSize + iCacheSize + sCacheSize, totalQueueSize, runtime.maxMemory()));
}
runtime.gc();
// totalMemory - freeMemory = memory in use
// maxMemory - memory in use = max available memory
if (!usingNativeMap
&& maxMemory > runtime.maxMemory() - (runtime.totalMemory() - runtime.freeMemory())) {
log.warn("In-memory map may not fit into local memory space.");
}
minorCompactionThreadPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_MINC_MAXCONCURRENT, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() -> context.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT),
TSERVER_MINOR_COMPACTOR_POOL.poolName, minorCompactionThreadPool);
defaultMigrationPool =
ThreadPools.getServerThreadPools().getPoolBuilder(METADATA_TABLET_MIGRATION_POOL)
.numCoreThreads(0).numMaxThreads(1).withTimeOut(60L, SECONDS).build();
migrationPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_MIGRATE_MAXCONCURRENT, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() -> context.getConfiguration().getCount(Property.TSERV_MIGRATE_MAXCONCURRENT),
TSERVER_TABLET_MIGRATION_POOL.poolName, migrationPool);
// not sure if concurrent assignments can run safely... even if they could there is probably no
// benefit at startup because
// individual tablet servers are already running assignments concurrently... having each
// individual tablet server run
// concurrent assignments would put more load on the metadata table at startup
assignmentPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_ASSIGNMENT_MAXCONCURRENT, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() -> context.getConfiguration().getCount(Property.TSERV_ASSIGNMENT_MAXCONCURRENT),
TABLET_ASSIGNMENT_POOL.poolName, assignmentPool);
assignMetaDataPool =
ThreadPools.getServerThreadPools().getPoolBuilder(METADATA_TABLET_ASSIGNMENT_POOL)
.numCoreThreads(0).numMaxThreads(1).withTimeOut(60, SECONDS).build();
activeAssignments = new ConcurrentHashMap<>();
summaryRetrievalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_SUMMARY_RETRIEVAL_THREADS, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_RETRIEVAL_THREADS),
TSERVER_SUMMARY_FILE_RETRIEVER_POOL.poolName, summaryRetrievalPool);
summaryRemotePool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_SUMMARY_REMOTE_THREADS, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_REMOTE_THREADS),
TSERVER_SUMMARY_REMOTE_POOL.poolName, summaryRemotePool);
summaryPartitionPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_SUMMARY_PARTITION_THREADS, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_PARTITION_THREADS),
TSERVER_SUMMARY_PARTITION_POOL.poolName, summaryPartitionPool);
boolean isScanServer = (tserver instanceof ScanServer);
Collection<ScanExecutorConfig> scanExecCfg = acuConf.getScanExecutors(isScanServer);
Map<String,Queue<Runnable>> scanExecQueues = new HashMap<>();
scanExecutors = scanExecCfg.stream().collect(toUnmodifiableMap(cfg -> cfg.name,
cfg -> createPriorityExecutor(cfg, scanExecQueues, enableMetrics)));
scanExecutorChoices = scanExecCfg.stream().collect(toUnmodifiableMap(cfg -> cfg.name,
cfg -> new ScanExecutorImpl(cfg, scanExecQueues.get(cfg.name))));
int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
fileLenCache = context.getCaches().createNewBuilder(CacheName.TSRM_FILE_LENGTHS, true)
.maximumSize(Math.min(maxOpenFiles * 1000L, 100_000)).build();
fileManager = new FileManager(context, maxOpenFiles, fileLenCache);
memoryManager = new LargestFirstMemoryManager();
memoryManager.init(context);
memMgmt = new MemoryManagementFramework();
memMgmt.startThreads();
var rootConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT),
TSERVER_CONDITIONAL_UPDATE_ROOT_POOL.poolName, rootConditionalPool);
var metaConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_CONDITIONAL_UPDATE_THREADS_META, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_META),
TSERVER_CONDITIONAL_UPDATE_META_POOL.poolName, metaConditionalPool);
var userConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER),
TSERVER_CONDITIONAL_UPDATE_USER_POOL.poolName, userConditionalPool);
conditionalMutationExecutors = Map.of(Ample.DataLevel.ROOT, rootConditionalPool,
Ample.DataLevel.METADATA, metaConditionalPool, Ample.DataLevel.USER, userConditionalPool);
// We can use the same map for both metadata and normal assignments since the keyspace (extent)
// is guaranteed to be unique. Schedule the task once, the task will reschedule itself.
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
.schedule(new AssignmentWatcher(context, activeAssignments), 5000, TimeUnit.MILLISECONDS));
}