public TabletServerResourceManager()

in server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java [271:422]


  public TabletServerResourceManager(ServerContext context, TabletHostingServer tserver) {
    this.context = context;
    final AccumuloConfiguration acuConf = context.getConfiguration();
    final boolean enableMetrics = context.getMetricsInfo().isMetricsEnabled();
    long maxMemory = acuConf.getAsBytes(Property.TSERV_MAXMEM);
    boolean usingNativeMap = acuConf.getBoolean(Property.TSERV_NATIVEMAP_ENABLED);
    if (usingNativeMap) {
      NativeMapLoader.load();
    }

    long totalQueueSize = acuConf.getAsBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);

    try {
      cacheManager = BlockCacheManagerFactory.getInstance(acuConf);
    } catch (ReflectiveOperationException e) {
      throw new IllegalStateException("Error creating BlockCacheManager", e);
    }

    cacheManager.start(tserver.getBlockCacheConfiguration(acuConf));

    _iCache = cacheManager.getBlockCache(CacheType.INDEX);
    _dCache = cacheManager.getBlockCache(CacheType.DATA);
    _sCache = cacheManager.getBlockCache(CacheType.SUMMARY);

    long dCacheSize = _dCache == null ? 0 : _dCache.getMaxHeapSize();
    long iCacheSize = _iCache == null ? 0 : _iCache.getMaxHeapSize();
    long sCacheSize = _sCache == null ? 0 : _sCache.getMaxHeapSize();

    Runtime runtime = Runtime.getRuntime();
    if (usingNativeMap) {
      // Still check block cache sizes when using native maps.
      if (dCacheSize + iCacheSize + sCacheSize + totalQueueSize > runtime.maxMemory()) {
        throw new IllegalArgumentException(String.format(
            "Block cache sizes %,d and mutation queue size %,d is too large for this JVM"
                + " configuration %,d",
            dCacheSize + iCacheSize + sCacheSize, totalQueueSize, runtime.maxMemory()));
      }
    } else if (maxMemory + dCacheSize + iCacheSize + sCacheSize + totalQueueSize
        > runtime.maxMemory()) {
      throw new IllegalArgumentException(String.format(
          "Maximum tablet server"
              + " map memory %,d block cache sizes %,d and mutation queue size %,d is"
              + " too large for this JVM configuration %,d",
          maxMemory, dCacheSize + iCacheSize + sCacheSize, totalQueueSize, runtime.maxMemory()));
    }
    runtime.gc();

    // totalMemory - freeMemory = memory in use
    // maxMemory - memory in use = max available memory
    if (!usingNativeMap
        && maxMemory > runtime.maxMemory() - (runtime.totalMemory() - runtime.freeMemory())) {
      log.warn("In-memory map may not fit into local memory space.");
    }

    minorCompactionThreadPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
        Property.TSERV_MINC_MAXCONCURRENT, enableMetrics);
    modifyThreadPoolSizesAtRuntime(
        () -> context.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT),
        TSERVER_MINOR_COMPACTOR_POOL.poolName, minorCompactionThreadPool);

    defaultMigrationPool =
        ThreadPools.getServerThreadPools().getPoolBuilder(METADATA_TABLET_MIGRATION_POOL)
            .numCoreThreads(0).numMaxThreads(1).withTimeOut(60L, SECONDS).build();

    migrationPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
        Property.TSERV_MIGRATE_MAXCONCURRENT, enableMetrics);
    modifyThreadPoolSizesAtRuntime(
        () -> context.getConfiguration().getCount(Property.TSERV_MIGRATE_MAXCONCURRENT),
        TSERVER_TABLET_MIGRATION_POOL.poolName, migrationPool);

    // not sure if concurrent assignments can run safely... even if they could there is probably no
    // benefit at startup because
    // individual tablet servers are already running assignments concurrently... having each
    // individual tablet server run
    // concurrent assignments would put more load on the metadata table at startup
    assignmentPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
        Property.TSERV_ASSIGNMENT_MAXCONCURRENT, enableMetrics);
    modifyThreadPoolSizesAtRuntime(
        () -> context.getConfiguration().getCount(Property.TSERV_ASSIGNMENT_MAXCONCURRENT),
        TABLET_ASSIGNMENT_POOL.poolName, assignmentPool);

    assignMetaDataPool =
        ThreadPools.getServerThreadPools().getPoolBuilder(METADATA_TABLET_ASSIGNMENT_POOL)
            .numCoreThreads(0).numMaxThreads(1).withTimeOut(60, SECONDS).build();

    activeAssignments = new ConcurrentHashMap<>();

    summaryRetrievalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
        Property.TSERV_SUMMARY_RETRIEVAL_THREADS, enableMetrics);
    modifyThreadPoolSizesAtRuntime(
        () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_RETRIEVAL_THREADS),
        TSERVER_SUMMARY_FILE_RETRIEVER_POOL.poolName, summaryRetrievalPool);

    summaryRemotePool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
        Property.TSERV_SUMMARY_REMOTE_THREADS, enableMetrics);
    modifyThreadPoolSizesAtRuntime(
        () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_REMOTE_THREADS),
        TSERVER_SUMMARY_REMOTE_POOL.poolName, summaryRemotePool);

    summaryPartitionPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
        Property.TSERV_SUMMARY_PARTITION_THREADS, enableMetrics);
    modifyThreadPoolSizesAtRuntime(
        () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_PARTITION_THREADS),
        TSERVER_SUMMARY_PARTITION_POOL.poolName, summaryPartitionPool);

    boolean isScanServer = (tserver instanceof ScanServer);

    Collection<ScanExecutorConfig> scanExecCfg = acuConf.getScanExecutors(isScanServer);
    Map<String,Queue<Runnable>> scanExecQueues = new HashMap<>();
    scanExecutors = scanExecCfg.stream().collect(toUnmodifiableMap(cfg -> cfg.name,
        cfg -> createPriorityExecutor(cfg, scanExecQueues, enableMetrics)));
    scanExecutorChoices = scanExecCfg.stream().collect(toUnmodifiableMap(cfg -> cfg.name,
        cfg -> new ScanExecutorImpl(cfg, scanExecQueues.get(cfg.name))));

    int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);

    fileLenCache = context.getCaches().createNewBuilder(CacheName.TSRM_FILE_LENGTHS, true)
        .maximumSize(Math.min(maxOpenFiles * 1000L, 100_000)).build();

    fileManager = new FileManager(context, maxOpenFiles, fileLenCache);

    memoryManager = new LargestFirstMemoryManager();
    memoryManager.init(context);
    memMgmt = new MemoryManagementFramework();
    memMgmt.startThreads();

    var rootConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
        Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT, enableMetrics);
    modifyThreadPoolSizesAtRuntime(
        () -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT),
        TSERVER_CONDITIONAL_UPDATE_ROOT_POOL.poolName, rootConditionalPool);

    var metaConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
        Property.TSERV_CONDITIONAL_UPDATE_THREADS_META, enableMetrics);
    modifyThreadPoolSizesAtRuntime(
        () -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_META),
        TSERVER_CONDITIONAL_UPDATE_META_POOL.poolName, metaConditionalPool);

    var userConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
        Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER, enableMetrics);
    modifyThreadPoolSizesAtRuntime(
        () -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER),
        TSERVER_CONDITIONAL_UPDATE_USER_POOL.poolName, userConditionalPool);

    conditionalMutationExecutors = Map.of(Ample.DataLevel.ROOT, rootConditionalPool,
        Ample.DataLevel.METADATA, metaConditionalPool, Ample.DataLevel.USER, userConditionalPool);

    // We can use the same map for both metadata and normal assignments since the keyspace (extent)
    // is guaranteed to be unique. Schedule the task once, the task will reschedule itself.
    ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
        .schedule(new AssignmentWatcher(context, activeAssignments), 5000, TimeUnit.MILLISECONDS));
  }