public int run()

in hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/Folder.java [220:499]


  public int run() throws IOException {
    class JobEntryComparator implements
        Comparator<Pair<LoggedJob, JobTraceReader>> {
      public int compare(Pair<LoggedJob, JobTraceReader> p1,
          Pair<LoggedJob, JobTraceReader> p2) {
        LoggedJob j1 = p1.first();
        LoggedJob j2 = p2.first();

        return (j1.getSubmitTime() < j2.getSubmitTime()) ? -1 : (j1
            .getSubmitTime() == j2.getSubmitTime()) ? 0 : 1;
      }
    }

    // we initialize an empty heap so if we take an error before establishing
    // a real one the finally code goes through
    Queue<Pair<LoggedJob, JobTraceReader>> heap =
        new PriorityQueue<Pair<LoggedJob, JobTraceReader>>();

    try {
      LoggedJob job = reader.nextJob();

      if (job == null) {
        LOG.error("The job trace is empty");

        return EMPTY_JOB_TRACE;
      }
      
      // If starts-after time is specified, skip the number of jobs till we reach
      // the starting time limit.
      if (startsAfter > 0) {
        LOG.info("starts-after time is specified. Initial job submit time : " 
                 + job.getSubmitTime());

        long approximateTime = job.getSubmitTime() + startsAfter;
        job = reader.nextJob();
        long skippedCount = 0;
        while (job != null && job.getSubmitTime() < approximateTime) {
          job = reader.nextJob();
          skippedCount++;
        }

        LOG.debug("Considering jobs with submit time greater than " 
                  + startsAfter + " ms. Skipped " + skippedCount + " jobs.");

        if (job == null) {
          LOG.error("No more jobs to process in the trace with 'starts-after'"+
                    " set to " + startsAfter + "ms.");
          return EMPTY_JOB_TRACE;
        }
        LOG.info("The first job has a submit time of " + job.getSubmitTime());
      }

      firstJobSubmitTime = job.getSubmitTime();
      long lastJobSubmitTime = firstJobSubmitTime;

      int numberJobs = 0;

      long currentIntervalEnd = Long.MIN_VALUE;

      Path nextSegment = null;
      Outputter<LoggedJob> tempGen = null;

      if (debug) {
        LOG.debug("The first job has a submit time of " + firstJobSubmitTime);
      }

      final Configuration conf = getConf();

      try {
        // At the top of this loop, skewBuffer has at most
        // skewBufferLength entries.
        while (job != null) {
          final Random tempNameGenerator = new Random();

          lastJobSubmitTime = job.getSubmitTime();

          ++numberJobs;

          if (job.getSubmitTime() >= currentIntervalEnd) {
            if (tempGen != null) {
              tempGen.close();
            }
            
            nextSegment = null;
            for (int i = 0; i < 3 && nextSegment == null; ++i) {
              try {
                nextSegment =
                    new Path(tempDir, "segment-" + tempNameGenerator.nextLong()
                        + ".json.gz");

                if (debug) {
                  LOG.debug("The next segment name is " + nextSegment);
                }

                FileSystem fs = nextSegment.getFileSystem(conf);

                try {
                  if (!fs.exists(nextSegment)) {
                    break;
                  }

                  continue;
                } catch (IOException e) {
                  // no code -- file did not already exist
                }
              } catch (IOException e) {
                // no code -- file exists now, or directory bad. We try three
                // times.
              }
            }

            if (nextSegment == null) {
              throw new RuntimeException("Failed to create a new file!");
            }
            
            if (debug) {
              LOG.debug("Creating " + nextSegment
                  + " for a job with a submit time of " + job.getSubmitTime());
            }

            deletees.add(nextSegment);

            tempPaths.add(nextSegment);

            tempGen = new DefaultOutputter<LoggedJob>();
            tempGen.init(nextSegment, conf);

            long currentIntervalNumber =
                (job.getSubmitTime() - firstJobSubmitTime) / inputCycle;

            currentIntervalEnd =
                firstJobSubmitTime + ((currentIntervalNumber + 1) * inputCycle);
          }

          // the temp files contain UDadjusted times, but each temp file's
          // content is in the same input cycle interval.
          if (tempGen != null) {
            tempGen.output(job);
          }

          job = reader.nextJob();
        }
      } catch (DeskewedJobTraceReader.OutOfOrderException e) {
        return OUT_OF_ORDER_JOBS;
      } finally {
        if (tempGen != null) {
          tempGen.close();
        }
      }

      if (lastJobSubmitTime <= firstJobSubmitTime) {
        LOG.error("All of your job[s] have the same submit time."
            + "  Please just use your input file.");

        return ALL_JOBS_SIMULTANEOUS;
      }

      double submitTimeSpan = lastJobSubmitTime - firstJobSubmitTime;

      LOG.warn("Your input trace spans "
          + (lastJobSubmitTime - firstJobSubmitTime) + " ticks.");

      double foldingRatio =
          submitTimeSpan * (numberJobs + 1) / numberJobs / inputCycle;

      if (debug) {
        LOG.warn("run: submitTimeSpan = " + submitTimeSpan + ", numberJobs = "
            + numberJobs + ", inputCycle = " + inputCycle);
      }

      if (reader.neededSkewBufferSize() > 0) {
        LOG.warn("You needed a -skew-buffer-length of "
            + reader.neededSkewBufferSize() + " but no more, for this input.");
      }

      double tProbability = timeDilation * concentration / foldingRatio;

      if (debug) {
        LOG.warn("run: timeDilation = " + timeDilation + ", concentration = "
            + concentration + ", foldingRatio = " + foldingRatio);
        LOG.warn("The transcription probability is " + tProbability);
      }

      transcriptionRateInteger = (int) Math.floor(tProbability);
      transcriptionRateFraction = tProbability - Math.floor(tProbability);

      // Now read all the inputs in parallel
      heap =
          new PriorityQueue<Pair<LoggedJob, JobTraceReader>>(tempPaths.size(),
              new JobEntryComparator());

      for (Path tempPath : tempPaths) {
        JobTraceReader thisReader = new JobTraceReader(tempPath, conf);

        closees.add(thisReader);

        LoggedJob streamFirstJob = thisReader.getNext();

        long thisIndex =
            (streamFirstJob.getSubmitTime() - firstJobSubmitTime) / inputCycle;

        if (debug) {
          LOG.debug("A job with submit time of "
              + streamFirstJob.getSubmitTime() + " is in interval # "
              + thisIndex);
        }

        adjustJobTimes(streamFirstJob);

        if (debug) {
          LOG.debug("That job's submit time is adjusted to "
              + streamFirstJob.getSubmitTime());
        }

        heap
            .add(new Pair<LoggedJob, JobTraceReader>(streamFirstJob, thisReader));
      }

      Pair<LoggedJob, JobTraceReader> next = heap.poll();

      while (next != null) {
        maybeOutput(next.first());

        if (debug) {
          LOG.debug("The most recent job has an adjusted submit time of "
              + next.first().getSubmitTime());
          LOG.debug(" Its replacement in the heap will come from input engine "
              + next.second());
        }

        LoggedJob replacement = next.second().getNext();

        if (replacement == null) {
          next.second().close();

          if (debug) {
            LOG.debug("That input engine is depleted.");
          }
        } else {
          adjustJobTimes(replacement);

          if (debug) {
            LOG.debug("The replacement has an adjusted submit time of "
                + replacement.getSubmitTime());
          }

          heap.add(new Pair<LoggedJob, JobTraceReader>(replacement, next
              .second()));
        }

        next = heap.poll();
      }
    } finally {
      IOUtils.cleanupWithLogger(null, reader);
      if (outGen != null) {
        outGen.close();
      }
      for (Pair<LoggedJob, JobTraceReader> heapEntry : heap) {
        heapEntry.second().close();
      }
      for (Closeable closee : closees) {
        closee.close();
      }
      if (!debug) {
        Configuration conf = getConf();

        for (Path deletee : deletees) {
          FileSystem fs = deletee.getFileSystem(conf);

          try {
            fs.delete(deletee, false);
          } catch (IOException e) {
            // no code
          }
        }
      }
    }

    return 0;
  }