public HelixTaskResult call()

in helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java [72:217]


  public HelixTaskResult call() {
    HelixTaskResult taskResult = null;

    ErrorType type = null;
    ErrorCode code = null;
    Long handlerStart = null;
    Long handlerEnd = null;

    long start = System.currentTimeMillis();
    logger.info("handling task: " + getTaskId() + " begin, at: " + start);
    _statusUpdateUtil.logInfo(_message, HelixTask.class, "Message handling task begin execute",
        _manager);
    _message.setExecuteStartTimeStamp(new Date().getTime());

    // add a concurrent map to hold currentStateUpdates for sub-messages of a batch-message
    // partitionName -> csUpdate
    if (_message.getBatchMessageMode()) {
      _notificationContext.add(MapKey.CURRENT_STATE_UPDATE.toString(),
          new ConcurrentHashMap<String, CurrentStateUpdate>());
    }

    // Handle the message
    try {
      setStarted();
      handlerStart = System.currentTimeMillis();
      taskResult = _handler.handleMessage();
      handlerEnd = System.currentTimeMillis();

      // cancel timeout task
      _executor.cancelTimeoutTask(this);
    } catch (InterruptedException e) {
      taskResult = new HelixTaskResult();
      taskResult.setException(e);
      taskResult.setInterrupted(true);

      _statusUpdateUtil.logError(_message, HelixTask.class, e,
          "State transition interrupted, timeout:" + _isTimeout, _manager);
      logger.info("Message " + _message.getMsgId() + " is interrupted");
    } catch (Exception e) {
      taskResult = new HelixTaskResult();
      taskResult.setException(e);
      taskResult.setMessage(e.getMessage());

      String errorMessage =
          "Exception while executing a message. " + e + " msgId: " + _message.getMsgId()
              + " type: " + _message.getMsgType();
      logger.error(errorMessage, e);
      _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, _manager);
    }

    Exception exception = null;
    try {
      if (taskResult.isSuccess()) {
        _statusUpdateUtil
            .logInfo(_message, _handler.getClass(), "Message handling task completed successfully", _manager);
        logger.info("Message " + _message.getMsgId() + " completed.");
        _executor.getParticipantMonitor().reportProcessedMessage(_message, ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
      } else {
        type = ErrorType.INTERNAL;

        if (taskResult.isInterrupted()) {
          logger.info("Message " + _message.getMsgId() + " is interrupted");
          code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL;
          if (_isTimeout) {
            int retryCount = _message.getRetryCount();
            logger.info("Message timeout, retry count: " + retryCount + " msgId:"
                + _message.getMsgId());
            _statusUpdateUtil.logInfo(_message, _handler.getClass(),
                "Message handling task timeout, retryCount:" + retryCount, _manager);
            // Notify the handler that timeout happens, and the number of retries left
            // In case timeout happens (time out and also interrupted)
            // we should retry the execution of the message by re-schedule it in
            if (retryCount > 0) {
              _message.setRetryCount(retryCount - 1);
              HelixTask task = new HelixTask(_message, _notificationContext, _handler, _executor);
              _executor.scheduleTask(task);
              return taskResult;
            }
          }
          _executor.getParticipantMonitor().reportProcessedMessage(
              _message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
        } else if (taskResult.isCancelled()) {
          type = null;
          _statusUpdateUtil
              .logInfo(_message, _handler.getClass(), "Cancellation completed successfully",
                  _manager);
          _executor.getParticipantMonitor().reportProcessedMessage(
              _message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
        } else {// logging for errors
          code = ErrorCode.ERROR;
          String errorMsg =
              "Message execution failed. msgId: " + getTaskId() + ", errorMsg: "
                  + taskResult.getMessage();
          logger.error(errorMsg);
          _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, _manager);
          _executor.getParticipantMonitor().reportProcessedMessage(
              _message, ParticipantMessageMonitor.ProcessedMessageState.FAILED);
        }
      }

      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
      // forward relay messages attached to this message to other participants
      if (taskResult.isSuccess()) {
        try {
          forwardRelayMessages(accessor, _message, taskResult.getCompleteTime());
        } catch (Exception e) {
          // Fail to send relay message should not result in a task execution failure
          // Currently we don't log error to ZK to reduce writes as when accessor throws
          // exception, ZK might not be in good condition.
          logger.warn("Failed to send relay messages.", e);
        }
      }

      finalCleanup(taskResult);
    } catch (Exception e) {
      finalCleanup(taskResult);
      exception = e;
      type = ErrorType.FRAMEWORK;
      code = ErrorCode.ERROR;

      String errorMessage =
          "Exception after executing a message, msgId: " + _message.getMsgId() + e;
      logger.error(errorMessage, e);
      _statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, _manager);
    } finally {
      long end = System.currentTimeMillis();
      long totalDuration = end - start;
      long handlerDuration =
          handlerStart != null && handlerEnd != null ? handlerEnd - handlerStart : -1;
      logger.info(
          "Message: {} (parent: {}) handling task for {}:{} completed at: {}, results: {}. FrameworkTime: {} ms; HandlerTime: {} ms.",
          _message.getMsgId(), _message.getAttribute(Attributes.PARENT_MSG_ID), _message.getResourceName(),
          _message.getPartitionName(), end, taskResult.isSuccess(), totalDuration - handlerDuration,
          handlerDuration);

      // Notify the handler about any error happened in the handling procedure, so that
      // the handler have chance to finally cleanup
      if (type == ErrorType.INTERNAL) {
        _handler.onError(taskResult.getException(), code, type);
      } else if (type == ErrorType.FRAMEWORK) {
        _handler.onError(exception, code, type);
      }
    }

    return taskResult;
  }