in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java [106:152]
private BlockManagerWorker(@Parameter(JobConf.ExecutorId.class) final String executorId,
@Parameter(JobConf.IORequestHandleThreadsTotal.class) final int numThreads,
final MemoryStore memoryStore,
final SerializedMemoryStore serializedMemoryStore,
final LocalFileStore localFileStore,
final RemoteFileStore remoteFileStore,
final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
final ByteTransfer byteTransfer,
final SerializerManager serializerManager,
final BlockTransferThrottler blockTransferThrottler) {
this.executorId = executorId;
this.memoryStore = memoryStore;
this.serializedMemoryStore = serializedMemoryStore;
this.localFileStore = localFileStore;
this.remoteFileStore = remoteFileStore;
this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
this.byteTransfer = byteTransfer;
this.backgroundExecutorService = Executors.newFixedThreadPool(numThreads);
this.blockToRemainingRead = new ConcurrentHashMap<>();
this.serializerManager = serializerManager;
this.blockLocationResponseCache = CacheBuilder.newBuilder()
// 2 seconds might be enough for "concurrent pending" fetch requests to reuse the same location
.expireAfterWrite(2, TimeUnit.SECONDS)
// No other eviction policy such as maximum cache size (i.e., this cache is unbounded)
.build(new CacheLoader<String, CompletableFuture<ControlMessage.Message>>() {
@Override
public CompletableFuture<ControlMessage.Message> load(final String blockIdWildcard) {
// Ask Master for the location.
// (IMPORTANT): This 'request' effectively blocks the TaskExecutor thread if the block is IN_PROGRESS.
// We use this property to make the receiver task of a 'push' edge to wait in an Executor for its input data
// to become available.
return persistentConnectionToMasterMap
.getMessageSender(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID).request(
ControlMessage.Message.newBuilder()
.setId(RuntimeIdManager.generateMessageId())
.setListenerId(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID)
.setType(ControlMessage.MessageType.RequestBlockLocation)
.setRequestBlockLocationMsg(
ControlMessage.RequestBlockLocationMsg.newBuilder()
.setExecutorId(executorId)
.setBlockIdWildcard(blockIdWildcard)
.build())
.build());
}
});
this.blockTransferThrottler = blockTransferThrottler;
}