in gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java [913:1001]
public void commit() {
boolean isTaskFailed = false;
try {
// Check if all forks succeeded
List<Integer> failedForkIds = new ArrayList<>();
for (Optional<Fork> fork : this.forks.keySet()) {
if (fork.isPresent()) {
if (fork.get().isSucceeded()) {
if (!fork.get().commit()) {
failedForkIds.add(fork.get().getIndex());
}
} else {
failedForkIds.add(fork.get().getIndex());
}
}
}
if (failedForkIds.size() == 0) {
// Set the task state to SUCCESSFUL. The state is not set to COMMITTED
// as the data publisher will do that upon successful data publishing.
if (this.taskState.getWorkingState() != WorkUnitState.WorkingState.FAILED) {
this.taskState.setWorkingState(WorkUnitState.WorkingState.SUCCESSFUL);
}
}
else {
ForkThrowableHolder holder = Task.getForkThrowableHolder(this.taskState.getTaskBroker());
LOG.info("Holder for this task {} is {}", this.taskId, holder);
if (!holder.isEmpty()) {
if (failedForkIds.size() == 1 && holder.getThrowable(failedForkIds.get(0)).isPresent()) {
failTask(holder.getThrowable(failedForkIds.get(0)).get());
} else {
failTask(holder.getAggregatedException(failedForkIds, this.taskId));
}
} else {
// just in case there are some corner cases where Fork throw an exception but doesn't add into holder
failTask(new ForkException("Fork branches " + failedForkIds + " failed for task " + this.taskId));
}
}
} catch (Throwable t) {
failTask(t);
isTaskFailed = true;
} finally {
addConstructsFinalStateToTaskState(extractor, converter, rowChecker);
this.taskState.setProp(ConfigurationKeys.WRITER_RECORDS_WRITTEN, getRecordsWritten());
this.taskState.setProp(ConfigurationKeys.WRITER_BYTES_WRITTEN, getBytesWritten());
this.submitTaskCommittedEvent();
try {
closer.close();
} catch (Throwable t) {
LOG.error("Failed to close all open resources", t);
if ((!isIgnoreCloseFailures) && (!isTaskFailed)) {
LOG.error("Setting the task state to failed.");
failTask(t);
}
}
for (Map.Entry<Optional<Fork>, Optional<Future<?>>> forkAndFuture : this.forks.entrySet()) {
if (forkAndFuture.getKey().isPresent() && forkAndFuture.getValue().isPresent()) {
try {
forkAndFuture.getValue().get().cancel(true);
} catch (Throwable t) {
LOG.error(String.format("Failed to cancel Fork \"%s\"", forkAndFuture.getKey().get()), t);
}
}
}
try {
if (shouldPublishDataInTask()) {
// If data should be published by the task, publish the data and set the task state to COMMITTED.
// Task data can only be published after all forks have been closed by closer.close().
if (this.taskState.getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL) {
publishTaskData();
this.taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
}
}
} catch (IOException ioe) {
failTask(ioe);
} finally {
long endTime = System.currentTimeMillis();
this.taskState.setEndTime(endTime);
this.taskState.setTaskDuration(endTime - startTime);
this.taskStateTracker.onTaskCommitCompletion(this);
}
}
}