public void messageReceived()

in tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java [351:475]


    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
        throws Exception {

      HttpRequest request = (HttpRequest) e.getMessage();
      if (request.getMethod() != GET) {
        sendError(ctx, METHOD_NOT_ALLOWED);
        return;
      }

      // Parsing the URL into key-values
      final Map<String, List<String>> params =
          new QueryStringDecoder(request.getUri()).getParameters();
      final List<String> types = params.get("type");
      final List<String> taskIdList = params.get("ta");
      final List<String> subQueryIds = params.get("sid");
      final List<String> partitionIds = params.get("p");

      if (types == null || taskIdList == null || subQueryIds == null
          || partitionIds == null) {
        sendError(ctx, "Required type, taskIds, subquery Id, and partition id",
            BAD_REQUEST);
        return;
      }

      if (types.size() != 1 || subQueryIds.size() != 1) {
        sendError(ctx, "Required type, taskIds, subquery Id, and partition id",
            BAD_REQUEST);
        return;
      }

      final List<FileChunk> chunks = Lists.newArrayList();

      String repartitionType = types.get(0);
      String sid = subQueryIds.get(0);
      String partitionId = partitionIds.get(0);
      List<String> taskIds = splitMaps(taskIdList);

      // the working dir of tajo worker for each query
      String queryBaseDir = queryId + "/output" + "/";

      LOG.info("PullServer request param: repartitionType=" + repartitionType +
          ", sid=" + sid + ", partitionId=" + partitionId + ", taskIds=" + taskIdList);

      String taskLocalDir = conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname);
      if (taskLocalDir == null ||
          taskLocalDir.equals("")) {
        LOG.error("Tajo local directory should be specified.");
      }
      LOG.info("PullServer baseDir: " + taskLocalDir + "/" + queryBaseDir);

      // if a subquery requires a range partitioning
      if (repartitionType.equals("r")) {
        String ta = taskIds.get(0);
        Path path = localFS.makeQualified(
            lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/"
                + ta + "/output/", conf));

        String startKey = params.get("start").get(0);
        String endKey = params.get("end").get(0);
        boolean last = params.get("final") != null;

        FileChunk chunk;
        try {
          chunk = getFileCunks(path, startKey, endKey, last);
        } catch (Throwable t) {
          LOG.error("ERROR Request: " + request.getUri(), t);
          sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST);
          return;
        }
        if (chunk != null) {
          chunks.add(chunk);
        }

        // if a subquery requires a hash repartition
      } else if (repartitionType.equals("h")) {
        for (String ta : taskIds) {
          Path path = localFS.makeQualified(
              lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" +
                  ta + "/output/" + partitionId, conf));
          File file = new File(path.toUri());
          FileChunk chunk = new FileChunk(file, 0, file.length());
          chunks.add(chunk);
        }
      } else {
        LOG.error("Unknown repartition type: " + repartitionType);
        return;
      }

      // Write the content.
      Channel ch = e.getChannel();
      if (chunks.size() == 0) {
        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
        ch.write(response);
        if (!isKeepAlive(request)) {
          ch.close();
        }
      }  else {
        FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
        long totalSize = 0;
        for (FileChunk chunk : file) {
          totalSize += chunk.length();
        }
        setContentLength(response, totalSize);

        // Write the initial line and the header.
        ch.write(response);

        ChannelFuture writeFuture = null;

        for (FileChunk chunk : file) {
          writeFuture = sendFile(ctx, ch, chunk);
          if (writeFuture == null) {
            sendError(ctx, NOT_FOUND);
            return;
          }
        }

        // Decide whether to close the connection or not.
        if (!isKeepAlive(request)) {
          // Close the connection when the whole content is written out.
          writeFuture.addListener(ChannelFutureListener.CLOSE);
        }
      }
    }