public void cleanup()

in tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java [854:968]


  public void cleanup() throws InterruptedException {
    LOG.info("Final Counters for " + taskSpec.getTaskAttemptID() + ": " + getCounters().toShortString());
    setTaskDone();
    if (eventRouterThread != null) {
      eventRouterThread.interrupt();
      LOG.info("Joining on EventRouter");
      try {
        eventRouterThread.join();
      } catch (InterruptedException e) {
        LOG.info("Ignoring interrupt while waiting for the router thread to die");
        Thread.currentThread().interrupt();
      }
      eventRouterThread = null;
    }

    // Close the unclosed IPO
    /**
     * Cleanup IPO that are not closed.  In case, regular close() has happened in IPO, they
     * would not be available in the IPOs to be cleaned. So this is safe.
     *
     * e.g whenever input gets closed() in normal way, it automatically removes it from
     * initializedInputs map.
     *
     * In case any exception happens in processor close or IO close, it wouldn't be removed from
     * the initialized IO data structures and here is the chance to close them and release
     * resources.
     *
     */
    if (LOG.isDebugEnabled()) {
      LOG.debug("Processor closed={}", processorClosed);
      LOG.debug("Num of inputs to be closed={}", initializedInputs.size());
      LOG.debug("Num of outputs to be closed={}", initializedOutputs.size());
    }

    // Close the remaining inited Inputs.
    Iterator<Map.Entry<String, LogicalInput>> inputIterator = initializedInputs.entrySet().iterator();
    while (inputIterator.hasNext()) {
      Map.Entry<String, LogicalInput> entry = inputIterator.next();
      String srcVertexName = entry.getKey();
      inputIterator.remove();
      try {
        ((InputFrameworkInterface)entry.getValue()).close();
        maybeResetInterruptStatus();
      } catch (InterruptedException ie) {
        //reset the status
        LOG.info("Resetting interrupt status for input with srcVertexName={}",
            srcVertexName);
        Thread.currentThread().interrupt();
      } catch (Throwable e) {
        LOG.warn(
            "Ignoring exception when closing input {}(cleanup). Exception class={}, message={}",
            srcVertexName, e.getClass().getName(), e.getMessage());
      } finally {
        LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
            .getContext().getTaskVertexName(), srcVertexName, Thread.currentThread().isInterrupted());
      }
    }

    // Close the remaining inited Outputs.
    Iterator<Map.Entry<String, LogicalOutput>> outputIterator = initializedOutputs.entrySet().iterator();
    while (outputIterator.hasNext()) {
      Map.Entry<String, LogicalOutput> entry = outputIterator.next();
      String destVertexName = entry.getKey();
      outputIterator.remove();
      try {
        ((OutputFrameworkInterface) entry.getValue()).close();
        maybeResetInterruptStatus();
      } catch (InterruptedException ie) {
        //reset the status
        LOG.info("Resetting interrupt status for output with destVertexName={}",
            destVertexName);
        Thread.currentThread().interrupt();
      } catch (Throwable e) {
        LOG.warn(
            "Ignoring exception when closing output {}(cleanup). Exception class={}, message={}",
            destVertexName, e.getClass().getName(), e.getMessage());
      } finally {
        LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
            .getContext().getTaskVertexName(), destVertexName, Thread.currentThread().isInterrupted());
      }
    }

    if (LOG.isDebugEnabled()) {
      printThreads();
    }

    // Close processor
    if (!processorClosed && processor != null) {
      try {
        processorClosed = true;
        processor.close();
        LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}",
            processor
            .getContext().getTaskVertexName(),
            processor.getContext().getTaskVertexIndex(),
            Thread.currentThread().isInterrupted());
        maybeResetInterruptStatus();
      } catch (InterruptedException ie) {
        //reset the status
        LOG.info("Resetting interrupt for processor");
        Thread.currentThread().interrupt();
      } catch (Throwable e) {
        LOG.warn("Ignoring Exception when closing processor(cleanup). Exception class={}, message={}",
            e.getClass().getName(), e.getMessage());
      }
    }

    try {
      closeContexts();
      // Cleanup references which may be held by misbehaved tasks.
      cleanupStructures();
    } catch (IOException e) {
      LOG.info("Error while cleaning up contexts ", e);
    }
  }