in mr/src/main/java/org/elasticsearch/hadoop/serialization/ScrollReader.java [268:409]
private Scroll read(Parser parser, BytesArray input) {
// get scroll_id
Token token = ParsingUtils.seek(parser, SCROLL_ID);
if (token == null) { // no scroll id is returned for frozen indices
if (log.isTraceEnabled()) {
log.info("No scroll id found, likely because the index is frozen");
}
return null;
}
Assert.isTrue(token == Token.VALUE_STRING, "invalid response");
String scrollId = parser.text();
long totalHits = hitsTotal(parser);
// check hits/total
if (totalHits == 0) {
return Scroll.empty(scrollId);
}
// move to hits/hits
token = ParsingUtils.seek(parser, HITS);
// move through the list and for each hit, extract the _id and _source
Assert.isTrue(token == Token.START_ARRAY, "invalid response");
List<Object[]> results = new ArrayList<Object[]>();
int responseHits = 0;
int skippedHits = 0;
int readHits = 0;
for (token = parser.nextToken(); token != Token.END_ARRAY; token = parser.nextToken()) {
responseHits++;
Object[] hit = readHit(parser, input);
if (hit != null) {
readHits++;
results.add(hit);
} else {
skippedHits++;
}
}
// convert the char positions into actual content
if (returnRawJson) {
// get all the longs
int[] pos = new int[results.size() * 6];
int offset = 0;
List<int[]> fragmentsPos = new ArrayList<int[]>(results.size());
for (Object[] result : results) {
int[] asCharPos = ((JsonResult) result[1]).asCharPos();
// remember the positions to easily replace the fragment later on
fragmentsPos.add(asCharPos);
// copy them into the lookup array
System.arraycopy(asCharPos, 0, pos, offset, asCharPos.length);
offset += asCharPos.length;
}
// convert them into byte positions
//int[] bytesPosition = BytesUtils.charToBytePosition(input, pos);
int[] bytesPosition = pos;
int bytesPositionIndex = 0;
BytesArray doc = new BytesArray(128);
// replace the fragments with the actual json
// trimming is currently disabled since it appears mainly within fields and not outside of it
// in other words in needs to be treated when the fragments are constructed
for (int fragmentIndex = 0; fragmentIndex < fragmentsPos.size(); fragmentIndex++ ) {
Object[] result = results.get(fragmentIndex);
JsonResult jsonPointers = (JsonResult) result[1];
// current fragment of doc + metadata (prefix + suffix)
// used to iterate through the byte array pointers
int[] fragmentPos = fragmentsPos.get(fragmentIndex);
int currentFragmentIndex = 0;
int rangeStart, rangeStop;
doc.add('{');
// first add the doc
if (jsonPointers.hasDoc()) {
rangeStart = bytesPosition[bytesPositionIndex];
rangeStop = bytesPosition[bytesPositionIndex + 1];
if (rangeStop - rangeStart < 0) {
throw new IllegalArgumentException(String.format("Invalid position given=%s %s",rangeStart, rangeStop));
}
// trim
//rangeStart = BytesUtils.trimLeft(input.bytes(), rangeStart, rangeStop);
//rangeStop = BytesUtils.trimRight(input.bytes(), rangeStart, rangeStop);
doc.add(input.bytes(), rangeStart, rangeStop - rangeStart);
// consumed doc pointers
currentFragmentIndex += 2;
bytesPositionIndex += 2;
}
// followed by the metadata under designed field
if (readMetadata) {
if (jsonPointers.hasDoc()) {
doc.add(',');
}
doc.add('"');
doc.add(StringUtils.jsonEncoding(metadataField));
doc.add('"');
doc.add(':');
doc.add('{');
// consume metadata
for (; currentFragmentIndex < fragmentPos.length; currentFragmentIndex += 2) {
rangeStart = bytesPosition[bytesPositionIndex];
rangeStop = bytesPosition[bytesPositionIndex + 1];
// trim
//rangeStart = BytesUtils.trimLeft(input.bytes(), rangeStart, rangeStop);
//rangeStop = BytesUtils.trimRight(input.bytes(), rangeStart, rangeStop);
if (rangeStop - rangeStart < 0) {
throw new IllegalArgumentException(String.format("Invalid position given=%s %s",rangeStart, rangeStop));
}
doc.add(input.bytes(), rangeStart, rangeStop - rangeStart);
bytesPositionIndex += 2;
}
doc.add('}');
}
doc.add('}');
// replace JsonResult with assembled document
result[1] = reader.wrapString(doc.toString());
doc.reset();
}
}
if (responseHits > 0) {
return new Scroll(scrollId, totalHits, results, responseHits, skippedHits);
} else {
// Scroll had no hits in the response, it must have concluded.
return new Scroll(scrollId, totalHits, true);
}
}