public void onOutputContext()

in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java [369:417]


  public void onOutputContext(final ByteOutputContext outputContext) throws InvalidProtocolBufferException {
    final ControlMessage.BlockTransferContextDescriptor descriptor =
      ControlMessage.BlockTransferContextDescriptor.PARSER.parseFrom(outputContext.getContextDescriptor());
    final DataStoreProperty.Value blockStore = convertBlockStore(descriptor.getBlockStore());
    final String blockId = descriptor.getBlockId();
    final KeyRange keyRange = SerializationUtils.deserialize(descriptor.getKeyRange().toByteArray());

    backgroundExecutorService.submit(new Runnable() {
      @Override
      public void run() {
        try {
          final Optional<Block> optionalBlock = getBlockStore(blockStore).readBlock(blockId);
          if (optionalBlock.isPresent()) {
            if (DataStoreProperty.Value.LOCAL_FILE_STORE.equals(blockStore)
              || DataStoreProperty.Value.GLUSTER_FILE_STORE.equals(blockStore)) {
              final List<FileArea> fileAreas = ((FileBlock) optionalBlock.get()).asFileAreas(keyRange);
              for (final FileArea fileArea : fileAreas) {
                try (ByteOutputContext.ByteOutputStream os = outputContext.newOutputStream()) {
                  os.writeFileArea(fileArea);
                }
              }
            } else {
              final Iterable<SerializedPartition> partitions = optionalBlock.get().readSerializedPartitions(keyRange);
              for (final SerializedPartition partition : partitions) {
                try (ByteOutputContext.ByteOutputStream os = outputContext.newOutputStream()) {
                  if (optionalBlock.get().getClass() == SerializedMemoryBlock.class) {
                    os.writeSerializedPartitionBuffer(partition, false);
                  } else {
                    // For NonSerializedMemoryBlock, the serialized partition to be sent is transient and needs
                    // to be released right after the data transfer.
                    os.writeSerializedPartitionBuffer(partition, true);
                  }
                }
              }
            }
            handleDataPersistence(blockStore, blockId);
            outputContext.close();

          } else {
            // We don't have the block here...
            throw new RuntimeException(String.format("Block %s not found in local BlockManagerWorker", blockId));
          }
        } catch (final IOException | BlockFetchException e) {
          LOG.error("Closing a block request exceptionally", e);
          outputContext.onChannelError(e);
        }
      }
    });
  }