in samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java [118:184]
public final void processAsync(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator,
TaskCallback callback) {
Runnable processRunnable = () -> {
try {
SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
if (inputOpImpl != null) {
CompletionStage<Void> processFuture;
MessageType messageType = MessageType.of(ime.getMessage());
switch (messageType) {
case USER_MESSAGE:
processFuture = inputOpImpl.onMessageAsync(ime, collector, coordinator);
break;
case END_OF_STREAM:
EndOfStreamMessage eosMessage = (EndOfStreamMessage) ime.getMessage();
processFuture =
inputOpImpl.aggregateEndOfStream(eosMessage, ime.getSystemStreamPartition(), collector, coordinator);
break;
case DRAIN:
DrainMessage drainMessage = (DrainMessage) ime.getMessage();
processFuture =
inputOpImpl.aggregateDrainMessages(drainMessage, ime.getSystemStreamPartition(), collector, coordinator);
break;
case WATERMARK:
WatermarkMessage watermarkMessage = (WatermarkMessage) ime.getMessage();
processFuture = inputOpImpl.aggregateWatermark(watermarkMessage, ime.getSystemStreamPartition(), collector,
coordinator);
break;
default:
processFuture = failedFuture(new SamzaException("Unknown message type " + messageType + " encountered."));
break;
}
processFuture.whenComplete((val, ex) -> {
if (ex != null) {
callback.failure(ex);
} else {
callback.complete();
}
});
} else {
// If InputOperator is not found in the operator graph for a given SystemStream, throw an exception else the
// job will timeout due to async task callback timeout (TaskCallbackTimeoutException)
final String errMessage = String.format("InputOperator not found in OperatorGraph for %s. The available input"
+ " operators are: %s. Please check SystemStream configuration for the `SystemConsumer` and/or task.inputs"
+ " task configuration.", systemStream, operatorImplGraph.getAllInputOperators());
LOG.error(errMessage);
callback.failure(new SamzaException(errMessage));
}
} catch (Exception e) {
LOG.error("Failed to process the incoming message due to ", e);
callback.failure(e);
}
};
if (taskThreadPool != null) {
LOG.debug("Processing message using thread pool.");
taskThreadPool.submit(processRunnable);
} else {
LOG.debug("Processing message on the run loop thread.");
processRunnable.run();
}
}