in tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java [315:423]
public void run() {
LOG.info("TaskRunner startup");
try {
taskLauncher = new Thread(new Runnable() {
@Override
public void run() {
int receivedNum = 0;
CallFuture<QueryUnitRequestProto> callFuture = null;
QueryUnitRequestProto taskRequest = null;
while(!stopped) {
NettyClientBase qmClient = null;
QueryMasterProtocolService.Interface qmClientService = null;
try {
qmClient = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
qmClientService = qmClient.getStub();
if (callFuture == null) {
callFuture = new CallFuture<QueryUnitRequestProto>();
LOG.info("Request GetTask: " + getId());
GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
.setExecutionBlockId(executionBlockId.getProto())
.setContainerId(((ContainerIdPBImpl) containerId).getProto())
.build();
qmClientService.getTask(null, request, callFuture);
}
try {
// wait for an assigning task for 3 seconds
taskRequest = callFuture.get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
if(stopped) {
break;
}
} catch (TimeoutException te) {
if(stopped) {
break;
}
// if there has been no assigning task for a given period,
// TaskRunner will retry to request an assigning task.
LOG.info("Retry assigning task:" + getId());
continue;
}
if (taskRequest != null) {
// QueryMaster can send the terminal signal to TaskRunner.
// If TaskRunner receives the terminal signal, TaskRunner will be terminated
// immediately.
if (taskRequest.getShouldDie()) {
LOG.info("Received ShouldDie flag:" + getId());
stop();
if(taskRunnerManager != null) {
//notify to TaskRunnerManager
taskRunnerManager.stopTask(getId());
}
} else {
taskRunnerManager.getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
LOG.info("Accumulated Received Task: " + (++receivedNum));
QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
if (tasks.containsKey(taskAttemptId)) {
LOG.error("Duplicate Task Attempt: " + taskAttemptId);
fatalError(qmClientService, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
continue;
}
LOG.info("Initializing: " + taskAttemptId);
Task task;
try {
task = new Task(taskAttemptId, taskRunnerContext, qmClientService,
new QueryUnitRequestImpl(taskRequest));
tasks.put(taskAttemptId, task);
task.init();
if (task.hasFetchPhase()) {
task.fetch(); // The fetch is performed in an asynchronous way.
}
// task.run() is a blocking call.
task.run();
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
fatalError(qmClientService, taskAttemptId, t.getMessage());
} finally {
callFuture = null;
taskRequest = null;
}
}
}
} catch (Throwable t) {
t.printStackTrace();
} finally {
connPool.releaseConnection(qmClient);
}
}
}
});
taskLauncher.start();
} catch (Throwable t) {
LOG.fatal("Unhandled exception. Starting shutdown.", t);
} finally {
for (Task t : tasks.values()) {
if (t.getStatus() != TaskAttemptState.TA_SUCCEEDED) {
t.abort();
}
}
}
}