in tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java [1003:1131]
private void handleRequest(ChannelHandlerContext ctx, HttpRequest request)
throws IOException, Exception {
if (request.getMethod() != GET) {
sendError(ctx, METHOD_NOT_ALLOWED);
return;
}
// Check whether the shuffle version is compatible
if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
request.headers().get(ShuffleHeader.HTTP_HEADER_NAME))
|| !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION))) {
sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
return;
}
final Map<String, List<String>> q = new QueryStringDecoder(request.getUri()).parameters();
final List<String> keepAliveList = q.get("keepAlive");
final List<String> dagCompletedQ = q.get("dagAction");
final List<String> vertexCompletedQ = q.get("vertexAction");
final List<String> taskAttemptFailedQ = q.get("taskAttemptAction");
boolean keepAliveParam = false;
if (keepAliveList != null && keepAliveList.size() == 1) {
keepAliveParam = Boolean.parseBoolean(keepAliveList.get(0));
LOG.debug("KeepAliveParam : {} : {}", keepAliveList, keepAliveParam);
}
final List<String> mapIds = splitMaps(q.get("map"));
final Range reduceRange = splitReduces(q.get("reduce"));
final List<String> jobQ = q.get("job");
final List<String> dagIdQ = q.get("dag");
final List<String> vertexIdQ = q.get("vertex");
if (LOG.isDebugEnabled()) {
LOG.debug("RECV: " + request.getUri() +
"\n mapId: " + mapIds +
"\n reduceId: " + reduceRange +
"\n jobId: " + jobQ +
"\n dagId: " + dagIdQ +
"\n keepAlive: " + keepAliveParam);
}
// If the request is for Dag Deletion, process the request and send OK.
if (deleteDagDirectories(ctx.channel(), dagCompletedQ, jobQ, dagIdQ)) {
return;
}
if (deleteVertexDirectories(ctx.channel(), vertexCompletedQ, jobQ, dagIdQ, vertexIdQ)) {
return;
}
if (deleteTaskAttemptDirectories(ctx.channel(), taskAttemptFailedQ, jobQ, dagIdQ, mapIds)) {
return;
}
if (mapIds == null || reduceRange == null || jobQ == null || dagIdQ == null) {
sendError(ctx, "Required param job, dag, map and reduce", BAD_REQUEST);
return;
}
if (jobQ.size() != 1) {
sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
return;
}
// this audit log is disabled by default,
// to turn it on please enable this audit log
// on log4j.properties by uncommenting the setting
if (AUDITLOG.isDebugEnabled()) {
AUDITLOG.debug("shuffle for " + jobQ.get(0) + " mapper: " + mapIds
+ " reducer: " + reduceRange);
}
String jobId;
String dagId;
try {
jobId = jobQ.get(0);
dagId = dagIdQ.get(0);
} catch (NumberFormatException e) {
sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
return;
} catch (IllegalArgumentException e) {
sendError(ctx, "Bad job parameter", BAD_REQUEST);
return;
}
final String reqUri = request.getUri();
if (null == reqUri) {
// TODO? add upstream?
sendError(ctx, FORBIDDEN);
return;
}
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
try {
verifyRequest(jobId, ctx, request, response,
new URL("http", "", this.port, reqUri));
} catch (IOException e) {
LOG.warn("Shuffle failure ", e);
sendError(ctx, e.getMessage(), UNAUTHORIZED);
return;
}
Map<String, MapOutputInfo> mapOutputInfoMap =
new HashMap<String, MapOutputInfo>();
Channel ch = ctx.channel();
ChannelPipeline pipeline = ch.pipeline();
TimeoutHandler timeoutHandler = (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
timeoutHandler.setEnabledTimeout(false);
String user = userRsrc.get(jobId);
try {
populateHeaders(mapIds, jobId, dagId, user, reduceRange,
response, keepAliveParam, mapOutputInfoMap);
} catch (DiskErrorException e) { // fatal error: fetcher should be aware of that
LOG.error("Shuffle error in populating headers (fatal: DiskErrorException):", e);
String errorMessage = getErrorMessage(e);
// custom message, might be noticed by fetchers
// it should reuse the current response object, as headers have been already set for it
sendFakeShuffleHeaderWithError(ctx,
ShuffleHandlerError.DISK_ERROR_EXCEPTION + ": " + errorMessage, response);
return;
} catch (IOException e) {
ch.write(response);
LOG.error("Shuffle error in populating headers :", e);
String errorMessage = getErrorMessage(e);
sendError(ctx, errorMessage, INTERNAL_SERVER_ERROR);
return;
}
ch.write(response);
//Initialize one ReduceContext object per channelRead call
boolean keepAlive = keepAliveParam || connectionKeepAliveEnabled;
ReduceContext reduceContext = new ReduceContext(mapIds, reduceRange, ctx,
user, mapOutputInfoMap, jobId, dagId, keepAlive);
for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
ChannelFuture nextMap = sendMap(reduceContext);
if(nextMap == null) {
return;
}
}
}