in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java [549:642]
private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
final List<DataFetcher> availableFetchers = new LinkedList<>(fetchers);
final List<DataFetcher> pendingFetchers = new LinkedList<>();
// Polling interval.
final long pollingInterval = 100; // ms
// Previous polling time
long prevPollingTime = System.currentTimeMillis();
// empty means we've consumed all task-external input data
while (!availableFetchers.isEmpty() || !pendingFetchers.isEmpty()) {
// We first fetch data from available data fetchers
final Iterator<DataFetcher> availableIterator = availableFetchers.iterator();
while (availableIterator.hasNext()) {
final DataFetcher dataFetcher = availableIterator.next();
try {
final Object element = dataFetcher.fetchDataElement();
onEventFromDataFetcher(element, dataFetcher);
if (element instanceof Finishmark) {
availableIterator.remove();
}
} catch (final NoSuchElementException e) {
// No element in current data fetcher, fetch data from next fetcher
// move current data fetcher to pending.
availableIterator.remove();
pendingFetchers.add(dataFetcher);
} catch (final IOException e) {
// IOException means that this task should be retried.
taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e);
return false;
}
}
final Iterator<DataFetcher> pendingIterator = pendingFetchers.iterator();
final long currentTime = System.currentTimeMillis();
if (isPollingTime(pollingInterval, currentTime, prevPollingTime)) {
// We check pending data every polling interval
prevPollingTime = currentTime;
while (pendingIterator.hasNext()) {
final DataFetcher dataFetcher = pendingIterator.next();
try {
final Object element = dataFetcher.fetchDataElement();
onEventFromDataFetcher(element, dataFetcher);
// We processed data. This means the data fetcher is now available.
// Add current data fetcher to available
pendingIterator.remove();
if (!(element instanceof Finishmark)) {
availableFetchers.add(dataFetcher);
}
} catch (final NoSuchElementException e) {
// The current data fetcher is still pending.. try next data fetcher
} catch (final IOException e) {
// IOException means that this task should be retried.
taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e);
return false;
}
}
}
// If there are no available fetchers,
// Sleep and retry fetching element from pending fetchers every polling interval
if (availableFetchers.isEmpty() && !pendingFetchers.isEmpty()) {
try {
Thread.sleep(pollingInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
// Close all data fetchers
fetchers.forEach(fetcher -> {
try {
fetcher.close();
} catch (final Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
});
return true;
}