src/org/apache/pig/data/SortedDataBag.java [407:509]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                    mMergeQ.add(c);
                } catch (EOFException eof) {
                    // Out of tuples in this file.  Set our slot in the
                    // array to null so we don't keep trying to read from
                    // this file.
                    try {
                        in.close();
                    }catch(IOException e) {
                        log.warn("Failed to close spill file.", e);
                    }
                    mStreams.set(fileNum, null);
                } catch (IOException ioe) {
                    String msg = "Unable to find our spill file.";
                    log.fatal(msg, ioe);
                    throw new RuntimeException(msg, ioe);
                }

            }
        }

        // Function assumes that the reader lock is already held before we enter
        // this function.
        private Tuple readFromMemory() {
            if (mContents.size() == 0) return null;

            if (mMemoryPtr < mContents.size()) {
                return ((ArrayList<Tuple>)mContents).get(mMemoryPtr++);
            } else {
                return null;
            }
        }

        /**
         * Pre-merge if there are too many spill files.  This avoids the issue
         * of having too large a fan out in our merge.  Experimentation by
         * the hadoop team has shown that 100 is about the optimal number
         * of spill files.  This function modifies the mSpillFiles array
         * and assumes the write lock is already held. It will not unlock it.
         *
         * Tuples are reconstituted as tuples, evaluated, and rewritten as
         * tuples.  This is expensive, but I need to do this in order to
         * use the sort spec that was provided to me.
         */
        private void preMerge() {
            if (mSpillFiles == null ||
                    mSpillFiles.size() <= MAX_SPILL_FILES) {
                return;
            }

            // While there are more than max spill files, gather max spill
            // files together and merge them into one file.  Then remove the others
            // from mSpillFiles.  The new spill files are attached at the
            // end of the list, so I can just keep going until I get a
            // small enough number without too much concern over uneven
            // size merges.  Convert mSpillFiles to a linked list since
            // we'll be removing pieces from the middle and we want to do
            // it efficiently.
            try {
                LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
                LinkedList<File> filesToDelete = new LinkedList<File>();
                while (ll.size() > MAX_SPILL_FILES) {
                    ListIterator<File> i = ll.listIterator();
                    mStreams =
                        new ArrayList<DataInputStream>(MAX_SPILL_FILES);
                    mMergeQ = new PriorityQueue<PQContainer>(MAX_SPILL_FILES);

                    for (int j = 0; j < MAX_SPILL_FILES; j++) {
                        try {
                            File f = i.next();
                            DataInputStream in =
                                new DataInputStream(new BufferedInputStream(
                                    new FileInputStream(f)));
                            mStreams.add(in);
                            addToQueue(null, mStreams.size() - 1);
                            i.remove();
                            filesToDelete.add(f);
                        } catch (FileNotFoundException fnfe) {
                            // We can't find our own spill file?  That should
                            // neer happen.
                            String msg = "Unable to find our spill file.";
                            log.fatal(msg, fnfe);
                            throw new RuntimeException(msg, fnfe);
                        }
                    }

                    // Get a new spill file.  This adds one to the end of
                    // the spill files list.  So I need to append it to my
                    // linked list as well so that it's still there when I
                    // move my linked list back to the spill files.
                    DataOutputStream out = null;
                    try {
                        out = getSpillFile();
                        ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
                        Tuple t;
                        while ((t = readFromPriorityQ()) != null) {
                            t.write(out);
                        }
                        out.flush();
                    } catch (IOException ioe) {
                        String msg = "Unable to find our spill file.";
                        log.fatal(msg, ioe);
                        throw new RuntimeException(msg, ioe);
                    } finally {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/org/apache/pig/data/InternalSortedBag.java [315:418]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                    mMergeQ.add(c);
                } catch (EOFException eof) {
                    // Out of tuples in this file.  Set our slot in the
                    // array to null so we don't keep trying to read from
                    // this file.
                    try {
                        in.close();
                    }catch(IOException e) {
                        log.warn("Failed to close spill file.", e);
                    }
                    mStreams.set(fileNum, null);
                } catch (IOException ioe) {
                    String msg = "Unable to find our spill file.";
                    log.fatal(msg, ioe);
                    throw new RuntimeException(msg, ioe);
                }

            }
        }

        // Function assumes that the reader lock is already held before we enter
        // this function.
        private Tuple readFromMemory() {
            if (mContents.size() == 0) return null;

            if (mMemoryPtr < mContents.size()) {
                return ((ArrayList<Tuple>)mContents).get(mMemoryPtr++);
            } else {
                return null;
            }
        }

        /**
         * Pre-merge if there are too many spill files.  This avoids the issue
         * of having too large a fan out in our merge.  Experimentation by
         * the hadoop team has shown that 100 is about the optimal number
         * of spill files.  This function modifies the mSpillFiles array
         * and assumes the write lock is already held. It will not unlock it.
         *
         * Tuples are reconstituted as tuples, evaluated, and rewritten as
         * tuples.  This is expensive, but I need to do this in order to
         * use the sort spec that was provided to me.
         */
        private void preMerge() {
            if (mSpillFiles == null ||
                    mSpillFiles.size() <= MAX_SPILL_FILES) {
                return;
            }

            // While there are more than max spill files, gather max spill
            // files together and merge them into one file.  Then remove the others
            // from mSpillFiles.  The new spill files are attached at the
            // end of the list, so I can just keep going until I get a
            // small enough number without too much concern over uneven
            // size merges.  Convert mSpillFiles to a linked list since
            // we'll be removing pieces from the middle and we want to do
            // it efficiently.
            try {
                LinkedList<File> ll = new LinkedList<File>(mSpillFiles);
                LinkedList<File> filesToDelete = new LinkedList<File>();
                while (ll.size() > MAX_SPILL_FILES) {
                    ListIterator<File> i = ll.listIterator();
                    mStreams =
                        new ArrayList<DataInputStream>(MAX_SPILL_FILES);
                    mMergeQ = new PriorityQueue<PQContainer>(MAX_SPILL_FILES);

                    for (int j = 0; j < MAX_SPILL_FILES; j++) {
                        try {
                            File f = i.next();
                            DataInputStream in =
                                new DataInputStream(new BufferedInputStream(
                                    new FileInputStream(f)));
                            mStreams.add(in);
                            addToQueue(null, mStreams.size() - 1);
                            i.remove();
                            filesToDelete.add(f);
                            
                        } catch (FileNotFoundException fnfe) {
                            // We can't find our own spill file?  That should
                            // neer happen.
                            String msg = "Unable to find our spill file.";
                            log.fatal(msg, fnfe);
                            throw new RuntimeException(msg, fnfe);
                        }
                    }

                    // Get a new spill file.  This adds one to the end of
                    // the spill files list.  So I need to append it to my
                    // linked list as well so that it's still there when I
                    // move my linked list back to the spill files.
                    DataOutputStream out = null;
                    try {
                        out = getSpillFile();
                        ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
                        Tuple t;
                        while ((t = readFromPriorityQ()) != null) {
                            t.write(out);
                        }
                        out.flush();
                    } catch (IOException ioe) {
                        String msg = "Unable to find our spill file.";
                        log.fatal(msg, ioe);
                        throw new RuntimeException(msg, ioe);
                    } finally {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



