public final void processAsync()

in samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java [118:184]


  public final void processAsync(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator,
      TaskCallback callback) {
    Runnable processRunnable = () -> {
      try {
        SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
        InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
        if (inputOpImpl != null) {
          CompletionStage<Void> processFuture;
          MessageType messageType = MessageType.of(ime.getMessage());
          switch (messageType) {
            case USER_MESSAGE:
              processFuture = inputOpImpl.onMessageAsync(ime, collector, coordinator);
              break;

            case END_OF_STREAM:
              EndOfStreamMessage eosMessage = (EndOfStreamMessage) ime.getMessage();
              processFuture =
                  inputOpImpl.aggregateEndOfStream(eosMessage, ime.getSystemStreamPartition(), collector, coordinator);
              break;

            case DRAIN:
              DrainMessage drainMessage = (DrainMessage) ime.getMessage();
              processFuture =
                  inputOpImpl.aggregateDrainMessages(drainMessage, ime.getSystemStreamPartition(), collector, coordinator);
              break;

            case WATERMARK:
              WatermarkMessage watermarkMessage = (WatermarkMessage) ime.getMessage();
              processFuture = inputOpImpl.aggregateWatermark(watermarkMessage, ime.getSystemStreamPartition(), collector,
                  coordinator);
              break;

            default:
              processFuture = failedFuture(new SamzaException("Unknown message type " + messageType + " encountered."));
              break;
          }

          processFuture.whenComplete((val, ex) -> {
            if (ex != null) {
              callback.failure(ex);
            } else {
              callback.complete();
            }
          });
        } else {
          // If InputOperator is not found in the operator graph for a given SystemStream, throw an exception else the
          // job will timeout due to async task callback timeout (TaskCallbackTimeoutException)
          final String errMessage = String.format("InputOperator not found in OperatorGraph for %s. The available input"
              + " operators are: %s. Please check SystemStream configuration for the `SystemConsumer` and/or task.inputs"
              + " task configuration.", systemStream, operatorImplGraph.getAllInputOperators());
          LOG.error(errMessage);
          callback.failure(new SamzaException(errMessage));
        }
      } catch (Exception e) {
        LOG.error("Failed to process the incoming message due to ", e);
        callback.failure(e);
      }
    };

    if (taskThreadPool != null) {
      LOG.debug("Processing message using thread pool.");
      taskThreadPool.submit(processRunnable);
    } else {
      LOG.debug("Processing message on the run loop thread.");
      processRunnable.run();
    }
  }