public List close()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java [751:872]


  public List<Event> close() throws IOException, InterruptedException {
    // In case there are buffers to be spilled, schedule spilling
    scheduleSpill(true);
    List<Event> eventList = Lists.newLinkedList();
    isShutdown.set(true);
    spillLock.lock();
    try {
      LOG.info(
          sourceDestNameTrimmed + ": " + "Waiting for all spills to complete : Pending : " + pendingSpillCount.get());
      while (pendingSpillCount.get() != 0 && spillException == null) {
        spillInProgress.await();
      }
    } finally {
      spillLock.unlock();
    }
    if (spillException != null) {
      LOG.error(sourceDestNameTrimmed + ": " + "Error during spill, throwing");
      // Assuming close will be called on the same thread as the write
      cleanup();
      currentBuffer.cleanup();
      currentBuffer = null;
      if (spillException instanceof IOException) {
        throw (IOException) spillException;
      } else {
        throw new IOException(spillException);
      }
    } else {
      LOG.info(sourceDestNameTrimmed + ": " + "All spills complete");
      // Assuming close will be called on the same thread as the write
      cleanup();

      List<Event> events = Lists.newLinkedList();
      if (!pipelinedShuffle) {
        if (skipBuffers) {
          writer.close();
          long rawLen = writer.getRawLength();
          long compLen = writer.getCompressedLength();

          BitSet emptyPartitions = new BitSet();
          if (outputRecordsCounter.getValue() == 0) {
            emptyPartitions.set(0);
          }
          if (reportPartitionStats()) {
            if (outputRecordsCounter.getValue() > 0) {
              sizePerPartition[0] = rawLen;
            }
          }
          cleanupCurrentBuffer();

          if (outputRecordsCounter.getValue() > 0) {
            outputBytesWithOverheadCounter.increment(rawLen);
            fileOutputBytesCounter.increment(compLen + indexFileSizeEstimate);
          }
          eventList.add(generateVMEvent());

          if (!canSendDataOverDME()) {
            TezIndexRecord rec = new TezIndexRecord(0, rawLen, compLen);
            TezSpillRecord sr = new TezSpillRecord(1);
            sr.putIndex(rec, 0);
            finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
            sr.writeToFile(finalIndexPath, conf, localFs);
          }
          eventList.add(generateDMEvent(false, -1, false, outputContext
                  .getUniqueIdentifier(), emptyPartitions));

          return eventList;
        }

        /*
          1. Final merge enabled
             - When lots of spills are there, mergeAll, generate events and return
             - If there are no existing spills, check for final spill and generate events
          2. Final merge disabled
             - If finalSpill generated data, generate events and return
             - If finalSpill did not generate data, it would automatically populate events
         */
        if (isFinalMergeEnabled) {
          if (numSpills.get() > 0) {
            mergeAll();
          } else {
            finalSpill();
          }
          updateTezCountersAndNotify();
          eventList.add(generateVMEvent());
          eventList.add(generateDMEvent());
        } else {
          // if no data is generated, finalSpill would create VMEvent & add to finalEvents
          SpillResult result = finalSpill();
          if (result != null) {
            updateTezCountersAndNotify();
            // Generate vm event
            finalEvents.add(generateVMEvent());

            // compute empty partitions based on spill result and generate DME
            int spillNum = numSpills.get() - 1;
            SpillCallback callback = new SpillCallback(spillNum);
            callback.computePartitionStats(result);
            BitSet emptyPartitions = getEmptyPartitions(callback.getRecordsPerPartition());
            String pathComponent = generatePathComponent(outputContext.getUniqueIdentifier(), spillNum);
            Event finalEvent = generateDMEvent(true, spillNum,
                true, pathComponent, emptyPartitions);
            finalEvents.add(finalEvent);
          }
          //all events to be sent out are in finalEvents.
          eventList.addAll(finalEvents);
        }
        cleanupCurrentBuffer();
        return eventList;
      }

      //For pipelined case, send out an event in case finalspill generated a spill file.
      if (finalSpill() != null) {
        // VertexManagerEvent is only sent at the end and thus sizePerPartition is used
        // for the sum of all spills.
        mayBeSendEventsForSpill(currentBuffer.recordsPerPartition,
            sizePerPartition, numSpills.get() - 1, true);
      }
      updateTezCountersAndNotify();
      cleanupCurrentBuffer();
      return events;
    }
  }