public void run()

in tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java [215:335]


    public void run() {
      LOG.info("Start TajoWorkerAllocationThread");
      CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
          new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();

      //TODO consider task's resource usage pattern
      int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
      float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK);

      TajoMasterProtocol.WorkerResourceAllocationRequest request =
          TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
              .setMinMemoryMBPerContainer(requiredMemoryMB)
              .setMaxMemoryMBPerContainer(requiredMemoryMB)
              .setNumContainers(event.getRequiredNum())
              .setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY)
              .setMinDiskSlotPerContainer(requiredDiskSlots)
              .setMaxDiskSlotPerContainer(requiredDiskSlots)
              .setQueryId(event.getExecutionBlockId().getQueryId().getProto())
              .build();

      RpcConnectionPool connPool = RpcConnectionPool.getPool(queryTaskContext.getConf());
      NettyClientBase tmClient = null;
      try {
        tmClient = connPool.getConnection(
            queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
            TajoMasterProtocol.class, true);
        TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
        masterClientService.allocateWorkerResources(null, request, callBack);
      } catch (Exception e) {
        LOG.error(e.getMessage(), e);
      } finally {
        connPool.releaseConnection(tmClient);
      }

      TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
      while(!stopped.get()) {
        try {
          response = callBack.get(3, TimeUnit.SECONDS);
          break;
        } catch (InterruptedException e) {
          if(stopped.get()) {
            return;
          }
        } catch (TimeoutException e) {
          LOG.info("No available worker resource for " + event.getExecutionBlockId());
          continue;
        }
      }
      int numAllocatedContainers = 0;

      if(response != null) {
        List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
        ExecutionBlockId executionBlockId = event.getExecutionBlockId();

        List<Container> containers = new ArrayList<Container>();
        for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
          TajoWorkerContainer container = new TajoWorkerContainer();
          NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getWorkerHost(),
              eachAllocatedResource.getPeerRpcPort());

          TajoWorkerContainerId containerId = new TajoWorkerContainerId();

          containerId.setApplicationAttemptId(
              ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId(),
                  eachAllocatedResource.getContainerId().getAppAttemptId().getAttemptId()));
          containerId.setId(eachAllocatedResource.getContainerId().getId());

          container.setId(containerId);
          container.setNodeId(nodeId);


          WorkerResource workerResource = new WorkerResource();
          workerResource.setMemoryMB(eachAllocatedResource.getAllocatedMemoryMB());
          workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots());

          Worker worker = new Worker(null, workerResource);
          worker.setHostName(nodeId.getHost());
          worker.setPeerRpcPort(nodeId.getPort());
          worker.setQueryMasterPort(eachAllocatedResource.getQueryMasterPort());
          worker.setPullServerPort(eachAllocatedResource.getWorkerPullServerPort());

          container.setWorkerResource(worker);

          containers.add(container);
        }

        SubQueryState state = queryTaskContext.getSubQuery(executionBlockId).getState();
        if (!SubQuery.isRunningState(state)) {
          try {
            List<ContainerId> containerIds = new ArrayList<ContainerId>();
            for(Container eachContainer: containers) {
              containerIds.add(eachContainer.getId());
            }
            TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, containerIds);
          } catch (Exception e) {
            LOG.error(e.getMessage(), e);
          }
          return;
        }

        if (allocatedResources.size() > 0) {
          if(LOG.isDebugEnabled()) {
            LOG.debug("SubQueryContainerAllocationEvent fire:" + executionBlockId);
          }
          queryTaskContext.getEventHandler().handle(new SubQueryContainerAllocationEvent(executionBlockId, containers));
        }
        numAllocatedContainers += allocatedResources.size();

      }
      if(event.getRequiredNum() > numAllocatedContainers) {
        ContainerAllocationEvent shortRequestEvent = new ContainerAllocationEvent(
            event.getType(), event.getExecutionBlockId(), event.getPriority(),
            event.getResource(),
            event.getRequiredNum() - numAllocatedContainers,
            event.isLeafQuery(), event.getProgress()
        );
        queryTaskContext.getEventHandler().handle(shortRequestEvent);

      }
      LOG.info("Stop TajoWorkerAllocationThread");
    }