public Tuple exec()

in datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSample.java [316:374]


    public Tuple exec(Tuple input) throws IOException
    {
      DataBag bag = (DataBag) input.get(0);

      DataBag selected = _BAG_FACTORY.newDefaultBag();
      DataBag aggWaiting = _BAG_FACTORY.newDefaultBag();

      boolean first = true;
      double p = 0.0d;
      long numItems = 0L; // number of items processed, including rejected
      long n1 = 0L;

      for (Tuple tuple : bag)
      {
        if (first)
        {
          p = (Double) tuple.get(0);
          first = false;
        }

        numItems += (Long) tuple.get(1);
        n1 = Math.max((Long) tuple.get(2), numItems);

        selected.addAll((DataBag) tuple.get(3));
        aggWaiting.addAll((DataBag) tuple.get(4));
      }

      DataBag waiting = _BAG_FACTORY.newDefaultBag();

      if (n1 > 0L)
      {
        double q1 = getQ1(n1, p);
        double q2 = getQ2(n1, p);

        for (Tuple t : aggWaiting)
        {
          ScoredTuple scored = ScoredTuple.fromIntermediateTuple(t);

          if (scored.getScore() < q1)
          {
            selected.add(scored.getTuple());
          }
          else if (scored.getScore() < q2)
          {
            waiting.add(t);
          }
        }
      }

      Tuple output = _TUPLE_FACTORY.newTuple();

      output.append(p);
      output.append(numItems);
      output.append(n1);
      output.append(selected);
      output.append(waiting);

      return output;
    }