public void run()

in tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java [315:423]


  public void run() {
    LOG.info("TaskRunner startup");

    try {

      taskLauncher = new Thread(new Runnable() {
        @Override
        public void run() {
          int receivedNum = 0;
          CallFuture<QueryUnitRequestProto> callFuture = null;
          QueryUnitRequestProto taskRequest = null;

          while(!stopped) {
            NettyClientBase qmClient = null;
            QueryMasterProtocolService.Interface qmClientService = null;
            try {
              qmClient = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
              qmClientService = qmClient.getStub();

              if (callFuture == null) {
                callFuture = new CallFuture<QueryUnitRequestProto>();
                LOG.info("Request GetTask: " + getId());
                GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
                    .setExecutionBlockId(executionBlockId.getProto())
                    .setContainerId(((ContainerIdPBImpl) containerId).getProto())
                    .build();

                qmClientService.getTask(null, request, callFuture);
              }
              try {
                // wait for an assigning task for 3 seconds
                taskRequest = callFuture.get(3, TimeUnit.SECONDS);
              } catch (InterruptedException e) {
                if(stopped) {
                  break;
                }
              } catch (TimeoutException te) {
                if(stopped) {
                  break;
                }
                // if there has been no assigning task for a given period,
                // TaskRunner will retry to request an assigning task.
                LOG.info("Retry assigning task:" + getId());
                continue;
              }

              if (taskRequest != null) {
                // QueryMaster can send the terminal signal to TaskRunner.
                // If TaskRunner receives the terminal signal, TaskRunner will be terminated
                // immediately.
                if (taskRequest.getShouldDie()) {
                  LOG.info("Received ShouldDie flag:" + getId());
                  stop();
                  if(taskRunnerManager != null) {
                    //notify to TaskRunnerManager
                    taskRunnerManager.stopTask(getId());
                  }
                } else {
                  taskRunnerManager.getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
                  LOG.info("Accumulated Received Task: " + (++receivedNum));

                  QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
                  if (tasks.containsKey(taskAttemptId)) {
                    LOG.error("Duplicate Task Attempt: " + taskAttemptId);
                    fatalError(qmClientService, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
                    continue;
                  }

                  LOG.info("Initializing: " + taskAttemptId);
                  Task task;
                  try {
                    task = new Task(taskAttemptId, taskRunnerContext, qmClientService,
                        new QueryUnitRequestImpl(taskRequest));
                    tasks.put(taskAttemptId, task);

                    task.init();
                    if (task.hasFetchPhase()) {
                      task.fetch(); // The fetch is performed in an asynchronous way.
                    }
                    // task.run() is a blocking call.
                    task.run();
                  } catch (Throwable t) {
                    LOG.error(t.getMessage(), t);
                    fatalError(qmClientService, taskAttemptId, t.getMessage());
                  } finally {
                    callFuture = null;
                    taskRequest = null;
                  }
                }
              }
            } catch (Throwable t) {
              t.printStackTrace();
            } finally {
              connPool.releaseConnection(qmClient);
            }
          }
        }
      });
      taskLauncher.start();
    } catch (Throwable t) {
      LOG.fatal("Unhandled exception. Starting shutdown.", t);
    } finally {
      for (Task t : tasks.values()) {
        if (t.getStatus() != TaskAttemptState.TA_SUCCEEDED) {
          t.abort();
        }
      }
    }
  }