in helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java [72:217]
public HelixTaskResult call() {
HelixTaskResult taskResult = null;
ErrorType type = null;
ErrorCode code = null;
Long handlerStart = null;
Long handlerEnd = null;
long start = System.currentTimeMillis();
logger.info("handling task: " + getTaskId() + " begin, at: " + start);
_statusUpdateUtil.logInfo(_message, HelixTask.class, "Message handling task begin execute",
_manager);
_message.setExecuteStartTimeStamp(new Date().getTime());
// add a concurrent map to hold currentStateUpdates for sub-messages of a batch-message
// partitionName -> csUpdate
if (_message.getBatchMessageMode()) {
_notificationContext.add(MapKey.CURRENT_STATE_UPDATE.toString(),
new ConcurrentHashMap<String, CurrentStateUpdate>());
}
// Handle the message
try {
setStarted();
handlerStart = System.currentTimeMillis();
taskResult = _handler.handleMessage();
handlerEnd = System.currentTimeMillis();
// cancel timeout task
_executor.cancelTimeoutTask(this);
} catch (InterruptedException e) {
taskResult = new HelixTaskResult();
taskResult.setException(e);
taskResult.setInterrupted(true);
_statusUpdateUtil.logError(_message, HelixTask.class, e,
"State transition interrupted, timeout:" + _isTimeout, _manager);
logger.info("Message " + _message.getMsgId() + " is interrupted");
} catch (Exception e) {
taskResult = new HelixTaskResult();
taskResult.setException(e);
taskResult.setMessage(e.getMessage());
String errorMessage =
"Exception while executing a message. " + e + " msgId: " + _message.getMsgId()
+ " type: " + _message.getMsgType();
logger.error(errorMessage, e);
_statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, _manager);
}
Exception exception = null;
try {
if (taskResult.isSuccess()) {
_statusUpdateUtil
.logInfo(_message, _handler.getClass(), "Message handling task completed successfully", _manager);
logger.info("Message " + _message.getMsgId() + " completed.");
_executor.getParticipantMonitor().reportProcessedMessage(_message, ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
} else {
type = ErrorType.INTERNAL;
if (taskResult.isInterrupted()) {
logger.info("Message " + _message.getMsgId() + " is interrupted");
code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL;
if (_isTimeout) {
int retryCount = _message.getRetryCount();
logger.info("Message timeout, retry count: " + retryCount + " msgId:"
+ _message.getMsgId());
_statusUpdateUtil.logInfo(_message, _handler.getClass(),
"Message handling task timeout, retryCount:" + retryCount, _manager);
// Notify the handler that timeout happens, and the number of retries left
// In case timeout happens (time out and also interrupted)
// we should retry the execution of the message by re-schedule it in
if (retryCount > 0) {
_message.setRetryCount(retryCount - 1);
HelixTask task = new HelixTask(_message, _notificationContext, _handler, _executor);
_executor.scheduleTask(task);
return taskResult;
}
}
_executor.getParticipantMonitor().reportProcessedMessage(
_message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
} else if (taskResult.isCancelled()) {
type = null;
_statusUpdateUtil
.logInfo(_message, _handler.getClass(), "Cancellation completed successfully",
_manager);
_executor.getParticipantMonitor().reportProcessedMessage(
_message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
} else {// logging for errors
code = ErrorCode.ERROR;
String errorMsg =
"Message execution failed. msgId: " + getTaskId() + ", errorMsg: "
+ taskResult.getMessage();
logger.error(errorMsg);
_statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, _manager);
_executor.getParticipantMonitor().reportProcessedMessage(
_message, ParticipantMessageMonitor.ProcessedMessageState.FAILED);
}
}
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
// forward relay messages attached to this message to other participants
if (taskResult.isSuccess()) {
try {
forwardRelayMessages(accessor, _message, taskResult.getCompleteTime());
} catch (Exception e) {
// Fail to send relay message should not result in a task execution failure
// Currently we don't log error to ZK to reduce writes as when accessor throws
// exception, ZK might not be in good condition.
logger.warn("Failed to send relay messages.", e);
}
}
finalCleanup(taskResult);
} catch (Exception e) {
finalCleanup(taskResult);
exception = e;
type = ErrorType.FRAMEWORK;
code = ErrorCode.ERROR;
String errorMessage =
"Exception after executing a message, msgId: " + _message.getMsgId() + e;
logger.error(errorMessage, e);
_statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, _manager);
} finally {
long end = System.currentTimeMillis();
long totalDuration = end - start;
long handlerDuration =
handlerStart != null && handlerEnd != null ? handlerEnd - handlerStart : -1;
logger.info(
"Message: {} (parent: {}) handling task for {}:{} completed at: {}, results: {}. FrameworkTime: {} ms; HandlerTime: {} ms.",
_message.getMsgId(), _message.getAttribute(Attributes.PARENT_MSG_ID), _message.getResourceName(),
_message.getPartitionName(), end, taskResult.isSuccess(), totalDuration - handlerDuration,
handlerDuration);
// Notify the handler about any error happened in the handling procedure, so that
// the handler have chance to finally cleanup
if (type == ErrorType.INTERNAL) {
_handler.onError(taskResult.getException(), code, type);
} else if (type == ErrorType.FRAMEWORK) {
_handler.onError(exception, code, type);
}
}
return taskResult;
}