private BlockManagerWorker()

in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java [106:152]


  private BlockManagerWorker(@Parameter(JobConf.ExecutorId.class) final String executorId,
                             @Parameter(JobConf.IORequestHandleThreadsTotal.class) final int numThreads,
                             final MemoryStore memoryStore,
                             final SerializedMemoryStore serializedMemoryStore,
                             final LocalFileStore localFileStore,
                             final RemoteFileStore remoteFileStore,
                             final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
                             final ByteTransfer byteTransfer,
                             final SerializerManager serializerManager,
                             final BlockTransferThrottler blockTransferThrottler) {
    this.executorId = executorId;
    this.memoryStore = memoryStore;
    this.serializedMemoryStore = serializedMemoryStore;
    this.localFileStore = localFileStore;
    this.remoteFileStore = remoteFileStore;
    this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
    this.byteTransfer = byteTransfer;
    this.backgroundExecutorService = Executors.newFixedThreadPool(numThreads);
    this.blockToRemainingRead = new ConcurrentHashMap<>();
    this.serializerManager = serializerManager;
    this.blockLocationResponseCache = CacheBuilder.newBuilder()
      // 2 seconds might be enough for "concurrent pending" fetch requests to reuse the same location
      .expireAfterWrite(2, TimeUnit.SECONDS)
      // No other eviction policy such as maximum cache size (i.e., this cache is unbounded)
      .build(new CacheLoader<String, CompletableFuture<ControlMessage.Message>>() {
        @Override
        public CompletableFuture<ControlMessage.Message> load(final String blockIdWildcard) {
          // Ask Master for the location.
          // (IMPORTANT): This 'request' effectively blocks the TaskExecutor thread if the block is IN_PROGRESS.
          // We use this property to make the receiver task of a 'push' edge to wait in an Executor for its input data
          // to become available.
          return persistentConnectionToMasterMap
            .getMessageSender(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID).request(
              ControlMessage.Message.newBuilder()
                .setId(RuntimeIdManager.generateMessageId())
                .setListenerId(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID)
                .setType(ControlMessage.MessageType.RequestBlockLocation)
                .setRequestBlockLocationMsg(
                  ControlMessage.RequestBlockLocationMsg.newBuilder()
                    .setExecutorId(executorId)
                    .setBlockIdWildcard(blockIdWildcard)
                    .build())
                .build());
        }
      });
    this.blockTransferThrottler = blockTransferThrottler;
  }