in flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java [102:314]
public void invoke() throws Exception {
// --------------------------------------------------------------------
// Initialize
// --------------------------------------------------------------------
LOG.debug(getLogString("Start registering input and output"));
// initialize OutputFormat
initOutputFormat();
// initialize input readers
try {
initInputReaders();
} catch (Exception e) {
throw new RuntimeException(
"Initializing the input streams failed"
+ (e.getMessage() == null ? "." : ": " + e.getMessage()),
e);
}
LOG.debug(getLogString("Finished registering input and output"));
// --------------------------------------------------------------------
// Invoke
// --------------------------------------------------------------------
LOG.debug(getLogString("Starting data sink operator"));
RuntimeContext ctx = createRuntimeContext();
final Counter numRecordsIn;
{
Counter tmpNumRecordsIn;
try {
InternalOperatorIOMetricGroup ioMetricGroup =
((InternalOperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
ioMetricGroup.reuseInputMetricsForTask();
ioMetricGroup.reuseOutputMetricsForTask();
tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
tmpNumRecordsIn = new SimpleCounter();
}
numRecordsIn = tmpNumRecordsIn;
}
if (RichOutputFormat.class.isAssignableFrom(this.format.getClass())) {
((RichOutputFormat) this.format).setRuntimeContext(ctx);
LOG.debug(getLogString("Rich Sink detected. Initializing runtime context."));
}
ExecutionConfig executionConfig = getExecutionConfig();
boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();
try {
// initialize local strategies
MutableObjectIterator<IT> input1;
switch (this.config.getInputLocalStrategy(0)) {
case NONE:
// nothing to do
localStrategy = null;
input1 = reader;
break;
case SORT:
// initialize sort local strategy
try {
// get type comparator
TypeComparatorFactory<IT> compFact =
this.config.getInputComparator(0, getUserCodeClassLoader());
if (compFact == null) {
throw new Exception(
"Missing comparator factory for local strategy on input " + 0);
}
// initialize sorter
Sorter<IT> sorter =
ExternalSorter.newBuilder(
getEnvironment().getMemoryManager(),
this,
this.inputTypeSerializerFactory.getSerializer(),
compFact.createComparator())
.maxNumFileHandles(this.config.getFilehandlesInput(0))
.enableSpilling(
getEnvironment().getIOManager(),
this.config.getSpillingThresholdInput(0))
.memoryFraction(this.config.getRelativeMemoryInput(0))
.objectReuse(
this.getExecutionConfig().isObjectReuseEnabled())
.largeRecords(this.config.getUseLargeRecordHandler())
.build(this.reader);
this.localStrategy = sorter;
input1 = sorter.getIterator();
} catch (Exception e) {
throw new RuntimeException(
"Initializing the input processing failed"
+ (e.getMessage() == null ? "." : ": " + e.getMessage()),
e);
}
break;
default:
throw new RuntimeException("Invalid local strategy for DataSinkTask");
}
// read the reader and write it to the output
final TypeSerializer<IT> serializer = this.inputTypeSerializerFactory.getSerializer();
final MutableObjectIterator<IT> input = input1;
final OutputFormat<IT> format = this.format;
// check if task has been canceled
if (this.taskCanceled) {
return;
}
LOG.debug(getLogString("Starting to produce output"));
// open
format.open(
new InitializationContext() {
@Override
public int getNumTasks() {
return getEnvironment().getTaskInfo().getNumberOfParallelSubtasks();
}
@Override
public int getTaskNumber() {
return getEnvironment().getTaskInfo().getIndexOfThisSubtask();
}
@Override
public int getAttemptNumber() {
return getEnvironment().getTaskInfo().getAttemptNumber();
}
});
if (objectReuseEnabled) {
IT record = serializer.createInstance();
// work!
while (!this.taskCanceled && ((record = input.next(record)) != null)) {
numRecordsIn.inc();
format.writeRecord(record);
}
} else {
IT record;
// work!
while (!this.taskCanceled && ((record = input.next()) != null)) {
numRecordsIn.inc();
format.writeRecord(record);
}
}
// close. We close here such that a regular close throwing an exception marks a task as
// failed.
if (!this.taskCanceled) {
this.format.close();
this.format = null;
}
} catch (Exception ex) {
// make a best effort to clean up
try {
if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
cleanupCalled = true;
((CleanupWhenUnsuccessful) format).tryCleanupOnError();
}
} catch (Throwable t) {
LOG.error("Cleanup on error failed.", t);
}
ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
if (ex instanceof CancelTaskException) {
// forward canceling exception
throw ex;
}
// drop, if the task was canceled
else if (!this.taskCanceled) {
if (LOG.isErrorEnabled()) {
LOG.error(getLogString("Error in user code: " + ex.getMessage()), ex);
}
throw ex;
}
} finally {
if (this.format != null) {
// close format, if it has not been closed, yet.
// This should only be the case if we had a previous error, or were canceled.
try {
this.format.close();
} catch (Throwable t) {
if (LOG.isWarnEnabled()) {
LOG.warn(getLogString("Error closing the output format"), t);
}
}
}
// close local strategy if necessary
if (localStrategy != null) {
try {
this.localStrategy.close();
} catch (Throwable t) {
LOG.error("Error closing local strategy", t);
}
}
BatchTask.clearReaders(new MutableReader<?>[] {inputReader});
}
if (!this.taskCanceled) {
LOG.debug(getLogString("Finished data sink operator"));
} else {
LOG.debug(getLogString("Data sink operator cancelled"));
}
}