private Scroll read()

in mr/src/main/java/org/elasticsearch/hadoop/serialization/ScrollReader.java [268:409]


    private Scroll read(Parser parser, BytesArray input) {
        // get scroll_id
        Token token = ParsingUtils.seek(parser, SCROLL_ID);
        if (token == null) { // no scroll id is returned for frozen indices
            if (log.isTraceEnabled()) {
                log.info("No scroll id found, likely because the index is frozen");
            }
            return null;
        }
        Assert.isTrue(token == Token.VALUE_STRING, "invalid response");
        String scrollId = parser.text();

        long totalHits = hitsTotal(parser);
        // check hits/total
        if (totalHits == 0) {
            return Scroll.empty(scrollId);
        }

        // move to hits/hits
        token = ParsingUtils.seek(parser, HITS);

        // move through the list and for each hit, extract the _id and _source
        Assert.isTrue(token == Token.START_ARRAY, "invalid response");

        List<Object[]> results = new ArrayList<Object[]>();
        int responseHits = 0;
        int skippedHits = 0;
        int readHits = 0;
        for (token = parser.nextToken(); token != Token.END_ARRAY; token = parser.nextToken()) {
            responseHits++;
            Object[] hit = readHit(parser, input);
            if (hit != null) {
                readHits++;
                results.add(hit);
            } else {
                skippedHits++;
            }
        }

        // convert the char positions into actual content
        if (returnRawJson) {
            // get all the longs
            int[] pos = new int[results.size() * 6];
            int offset = 0;

            List<int[]> fragmentsPos = new ArrayList<int[]>(results.size());

            for (Object[] result : results) {
                int[] asCharPos = ((JsonResult) result[1]).asCharPos();
                // remember the positions to easily replace the fragment later on
                fragmentsPos.add(asCharPos);
                // copy them into the lookup array
                System.arraycopy(asCharPos, 0, pos, offset, asCharPos.length);
                offset += asCharPos.length;
            }
            // convert them into byte positions
            //int[] bytesPosition = BytesUtils.charToBytePosition(input, pos);
            int[] bytesPosition = pos;

            int bytesPositionIndex = 0;

            BytesArray doc = new BytesArray(128);
            // replace the fragments with the actual json

            // trimming is currently disabled since it appears mainly within fields and not outside of it
            // in other words in needs to be treated when the fragments are constructed
            for (int fragmentIndex = 0; fragmentIndex < fragmentsPos.size(); fragmentIndex++ ) {

                Object[] result = results.get(fragmentIndex);
                JsonResult jsonPointers = (JsonResult) result[1];

                // current fragment of doc + metadata (prefix + suffix)
                // used to iterate through the byte array pointers
                int[] fragmentPos = fragmentsPos.get(fragmentIndex);
                int currentFragmentIndex = 0;

                int rangeStart, rangeStop;

                doc.add('{');
                // first add the doc
                if (jsonPointers.hasDoc()) {
                    rangeStart = bytesPosition[bytesPositionIndex];
                    rangeStop = bytesPosition[bytesPositionIndex + 1];

                    if (rangeStop - rangeStart < 0) {
                        throw new IllegalArgumentException(String.format("Invalid position given=%s %s",rangeStart, rangeStop));
                    }

                    // trim
                    //rangeStart = BytesUtils.trimLeft(input.bytes(), rangeStart, rangeStop);
                    //rangeStop = BytesUtils.trimRight(input.bytes(), rangeStart, rangeStop);

                    doc.add(input.bytes(), rangeStart, rangeStop - rangeStart);

                    // consumed doc pointers
                    currentFragmentIndex += 2;
                    bytesPositionIndex += 2;

                }
                // followed by the metadata under designed field
                if (readMetadata) {
                    if (jsonPointers.hasDoc()) {
                        doc.add(',');
                    }
                    doc.add('"');
                    doc.add(StringUtils.jsonEncoding(metadataField));
                    doc.add('"');
                    doc.add(':');
                    doc.add('{');

                    // consume metadata
                    for (; currentFragmentIndex < fragmentPos.length; currentFragmentIndex += 2) {
                        rangeStart = bytesPosition[bytesPositionIndex];
                        rangeStop = bytesPosition[bytesPositionIndex + 1];
                        // trim
                        //rangeStart = BytesUtils.trimLeft(input.bytes(), rangeStart, rangeStop);
                        //rangeStop = BytesUtils.trimRight(input.bytes(), rangeStart, rangeStop);

                        if (rangeStop - rangeStart < 0) {
                            throw new IllegalArgumentException(String.format("Invalid position given=%s %s",rangeStart, rangeStop));
                        }

                        doc.add(input.bytes(), rangeStart, rangeStop - rangeStart);
                        bytesPositionIndex += 2;
                    }
                    doc.add('}');
                }
                doc.add('}');

                // replace JsonResult with assembled document
                result[1] = reader.wrapString(doc.toString());
                doc.reset();
            }
        }

        if (responseHits > 0) {
            return new Scroll(scrollId, totalHits, results, responseHits, skippedHits);
        } else {
            // Scroll had no hits in the response, it must have concluded.
            return new Scroll(scrollId, totalHits, true);
        }
    }