in tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java [351:475]
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
HttpRequest request = (HttpRequest) e.getMessage();
if (request.getMethod() != GET) {
sendError(ctx, METHOD_NOT_ALLOWED);
return;
}
// Parsing the URL into key-values
final Map<String, List<String>> params =
new QueryStringDecoder(request.getUri()).getParameters();
final List<String> types = params.get("type");
final List<String> taskIdList = params.get("ta");
final List<String> subQueryIds = params.get("sid");
final List<String> partitionIds = params.get("p");
if (types == null || taskIdList == null || subQueryIds == null
|| partitionIds == null) {
sendError(ctx, "Required type, taskIds, subquery Id, and partition id",
BAD_REQUEST);
return;
}
if (types.size() != 1 || subQueryIds.size() != 1) {
sendError(ctx, "Required type, taskIds, subquery Id, and partition id",
BAD_REQUEST);
return;
}
final List<FileChunk> chunks = Lists.newArrayList();
String repartitionType = types.get(0);
String sid = subQueryIds.get(0);
String partitionId = partitionIds.get(0);
List<String> taskIds = splitMaps(taskIdList);
// the working dir of tajo worker for each query
String queryBaseDir = queryId + "/output" + "/";
LOG.info("PullServer request param: repartitionType=" + repartitionType +
", sid=" + sid + ", partitionId=" + partitionId + ", taskIds=" + taskIdList);
String taskLocalDir = conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname);
if (taskLocalDir == null ||
taskLocalDir.equals("")) {
LOG.error("Tajo local directory should be specified.");
}
LOG.info("PullServer baseDir: " + taskLocalDir + "/" + queryBaseDir);
// if a subquery requires a range partitioning
if (repartitionType.equals("r")) {
String ta = taskIds.get(0);
Path path = localFS.makeQualified(
lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/"
+ ta + "/output/", conf));
String startKey = params.get("start").get(0);
String endKey = params.get("end").get(0);
boolean last = params.get("final") != null;
FileChunk chunk;
try {
chunk = getFileCunks(path, startKey, endKey, last);
} catch (Throwable t) {
LOG.error("ERROR Request: " + request.getUri(), t);
sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST);
return;
}
if (chunk != null) {
chunks.add(chunk);
}
// if a subquery requires a hash repartition
} else if (repartitionType.equals("h")) {
for (String ta : taskIds) {
Path path = localFS.makeQualified(
lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" +
ta + "/output/" + partitionId, conf));
File file = new File(path.toUri());
FileChunk chunk = new FileChunk(file, 0, file.length());
chunks.add(chunk);
}
} else {
LOG.error("Unknown repartition type: " + repartitionType);
return;
}
// Write the content.
Channel ch = e.getChannel();
if (chunks.size() == 0) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
ch.write(response);
if (!isKeepAlive(request)) {
ch.close();
}
} else {
FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
long totalSize = 0;
for (FileChunk chunk : file) {
totalSize += chunk.length();
}
setContentLength(response, totalSize);
// Write the initial line and the header.
ch.write(response);
ChannelFuture writeFuture = null;
for (FileChunk chunk : file) {
writeFuture = sendFile(ctx, ch, chunk);
if (writeFuture == null) {
sendError(ctx, NOT_FOUND);
return;
}
}
// Decide whether to close the connection or not.
if (!isKeepAlive(request)) {
// Close the connection when the whole content is written out.
writeFuture.addListener(ChannelFutureListener.CLOSE);
}
}
}