in tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java [854:968]
public void cleanup() throws InterruptedException {
LOG.info("Final Counters for " + taskSpec.getTaskAttemptID() + ": " + getCounters().toShortString());
setTaskDone();
if (eventRouterThread != null) {
eventRouterThread.interrupt();
LOG.info("Joining on EventRouter");
try {
eventRouterThread.join();
} catch (InterruptedException e) {
LOG.info("Ignoring interrupt while waiting for the router thread to die");
Thread.currentThread().interrupt();
}
eventRouterThread = null;
}
// Close the unclosed IPO
/**
* Cleanup IPO that are not closed. In case, regular close() has happened in IPO, they
* would not be available in the IPOs to be cleaned. So this is safe.
*
* e.g whenever input gets closed() in normal way, it automatically removes it from
* initializedInputs map.
*
* In case any exception happens in processor close or IO close, it wouldn't be removed from
* the initialized IO data structures and here is the chance to close them and release
* resources.
*
*/
if (LOG.isDebugEnabled()) {
LOG.debug("Processor closed={}", processorClosed);
LOG.debug("Num of inputs to be closed={}", initializedInputs.size());
LOG.debug("Num of outputs to be closed={}", initializedOutputs.size());
}
// Close the remaining inited Inputs.
Iterator<Map.Entry<String, LogicalInput>> inputIterator = initializedInputs.entrySet().iterator();
while (inputIterator.hasNext()) {
Map.Entry<String, LogicalInput> entry = inputIterator.next();
String srcVertexName = entry.getKey();
inputIterator.remove();
try {
((InputFrameworkInterface)entry.getValue()).close();
maybeResetInterruptStatus();
} catch (InterruptedException ie) {
//reset the status
LOG.info("Resetting interrupt status for input with srcVertexName={}",
srcVertexName);
Thread.currentThread().interrupt();
} catch (Throwable e) {
LOG.warn(
"Ignoring exception when closing input {}(cleanup). Exception class={}, message={}",
srcVertexName, e.getClass().getName(), e.getMessage());
} finally {
LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
.getContext().getTaskVertexName(), srcVertexName, Thread.currentThread().isInterrupted());
}
}
// Close the remaining inited Outputs.
Iterator<Map.Entry<String, LogicalOutput>> outputIterator = initializedOutputs.entrySet().iterator();
while (outputIterator.hasNext()) {
Map.Entry<String, LogicalOutput> entry = outputIterator.next();
String destVertexName = entry.getKey();
outputIterator.remove();
try {
((OutputFrameworkInterface) entry.getValue()).close();
maybeResetInterruptStatus();
} catch (InterruptedException ie) {
//reset the status
LOG.info("Resetting interrupt status for output with destVertexName={}",
destVertexName);
Thread.currentThread().interrupt();
} catch (Throwable e) {
LOG.warn(
"Ignoring exception when closing output {}(cleanup). Exception class={}, message={}",
destVertexName, e.getClass().getName(), e.getMessage());
} finally {
LOG.info("Closed input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
.getContext().getTaskVertexName(), destVertexName, Thread.currentThread().isInterrupted());
}
}
if (LOG.isDebugEnabled()) {
printThreads();
}
// Close processor
if (!processorClosed && processor != null) {
try {
processorClosed = true;
processor.close();
LOG.info("Closed processor for vertex={}, index={}, interruptedStatus={}",
processor
.getContext().getTaskVertexName(),
processor.getContext().getTaskVertexIndex(),
Thread.currentThread().isInterrupted());
maybeResetInterruptStatus();
} catch (InterruptedException ie) {
//reset the status
LOG.info("Resetting interrupt for processor");
Thread.currentThread().interrupt();
} catch (Throwable e) {
LOG.warn("Ignoring Exception when closing processor(cleanup). Exception class={}, message={}",
e.getClass().getName(), e.getMessage());
}
}
try {
closeContexts();
// Cleanup references which may be held by misbehaved tasks.
cleanupStructures();
} catch (IOException e) {
LOG.info("Error while cleaning up contexts ", e);
}
}