private void handleRequest()

in tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java [1003:1131]


    private void handleRequest(ChannelHandlerContext ctx, HttpRequest request)
        throws IOException, Exception {
      if (request.getMethod() != GET) {
          sendError(ctx, METHOD_NOT_ALLOWED);
          return;
      }
      // Check whether the shuffle version is compatible
      if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
          request.headers().get(ShuffleHeader.HTTP_HEADER_NAME))
          || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
              request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION))) {
        sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
        return;
      }
      final Map<String, List<String>> q = new QueryStringDecoder(request.getUri()).parameters();
      final List<String> keepAliveList = q.get("keepAlive");
      final List<String> dagCompletedQ = q.get("dagAction");
      final List<String> vertexCompletedQ = q.get("vertexAction");
      final List<String> taskAttemptFailedQ = q.get("taskAttemptAction");
      boolean keepAliveParam = false;
      if (keepAliveList != null && keepAliveList.size() == 1) {
        keepAliveParam = Boolean.parseBoolean(keepAliveList.get(0));
        LOG.debug("KeepAliveParam : {} : {}", keepAliveList, keepAliveParam);
      }
      final List<String> mapIds = splitMaps(q.get("map"));
      final Range reduceRange = splitReduces(q.get("reduce"));
      final List<String> jobQ = q.get("job");
      final List<String> dagIdQ = q.get("dag");
      final List<String> vertexIdQ = q.get("vertex");
      if (LOG.isDebugEnabled()) {
        LOG.debug("RECV: " + request.getUri() +
            "\n  mapId: " + mapIds +
            "\n  reduceId: " + reduceRange +
            "\n  jobId: " + jobQ +
            "\n  dagId: " + dagIdQ +
            "\n  keepAlive: " + keepAliveParam);
      }
      // If the request is for Dag Deletion, process the request and send OK.
      if (deleteDagDirectories(ctx.channel(), dagCompletedQ, jobQ, dagIdQ))  {
        return;
      }
      if (deleteVertexDirectories(ctx.channel(), vertexCompletedQ, jobQ, dagIdQ, vertexIdQ)) {
        return;
      }
      if (deleteTaskAttemptDirectories(ctx.channel(), taskAttemptFailedQ, jobQ, dagIdQ, mapIds)) {
        return;
      }
      if (mapIds == null || reduceRange == null || jobQ == null || dagIdQ == null) {
        sendError(ctx, "Required param job, dag, map and reduce", BAD_REQUEST);
        return;
      }
      if (jobQ.size() != 1) {
        sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
        return;
      }

      // this audit log is disabled by default,
      // to turn it on please enable this audit log
      // on log4j.properties by uncommenting the setting
      if (AUDITLOG.isDebugEnabled()) {
        AUDITLOG.debug("shuffle for " + jobQ.get(0) + " mapper: " + mapIds
                         + " reducer: " + reduceRange);
      }
      String jobId;
      String dagId;
      try {
        jobId = jobQ.get(0);
        dagId = dagIdQ.get(0);
      } catch (NumberFormatException e) {
        sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
        return;
      } catch (IllegalArgumentException e) {
        sendError(ctx, "Bad job parameter", BAD_REQUEST);
        return;
      }
      final String reqUri = request.getUri();
      if (null == reqUri) {
        // TODO? add upstream?
        sendError(ctx, FORBIDDEN);
        return;
      }
      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
      try {
        verifyRequest(jobId, ctx, request, response,
            new URL("http", "", this.port, reqUri));
      } catch (IOException e) {
        LOG.warn("Shuffle failure ", e);
        sendError(ctx, e.getMessage(), UNAUTHORIZED);
        return;
      }

      Map<String, MapOutputInfo> mapOutputInfoMap =
          new HashMap<String, MapOutputInfo>();
      Channel ch = ctx.channel();
      ChannelPipeline pipeline = ch.pipeline();
      TimeoutHandler timeoutHandler = (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
      timeoutHandler.setEnabledTimeout(false);
      String user = userRsrc.get(jobId);

      try {
        populateHeaders(mapIds, jobId, dagId, user, reduceRange,
          response, keepAliveParam, mapOutputInfoMap);
      } catch (DiskErrorException e) { // fatal error: fetcher should be aware of that
        LOG.error("Shuffle error in populating headers (fatal: DiskErrorException):", e);
        String errorMessage = getErrorMessage(e);
        // custom message, might be noticed by fetchers
        // it should reuse the current response object, as headers have been already set for it
        sendFakeShuffleHeaderWithError(ctx,
            ShuffleHandlerError.DISK_ERROR_EXCEPTION + ": " + errorMessage, response);
        return;
      } catch (IOException e) {
        ch.write(response);
        LOG.error("Shuffle error in populating headers :", e);
        String errorMessage = getErrorMessage(e);
        sendError(ctx, errorMessage, INTERNAL_SERVER_ERROR);
        return;
      }
      ch.write(response);
      //Initialize one ReduceContext object per channelRead call
      boolean keepAlive = keepAliveParam || connectionKeepAliveEnabled;
      ReduceContext reduceContext = new ReduceContext(mapIds, reduceRange, ctx,
          user, mapOutputInfoMap, jobId, dagId, keepAlive);
      for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
        ChannelFuture nextMap = sendMap(reduceContext);
        if(nextMap == null) {
          return;
        }
      }
    }