public DataBag exec()

in datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSample.java [391:448]


    public DataBag exec(Tuple input) throws IOException
    {
      DataBag bag = (DataBag) input.get(0);

      boolean first = true;
      double p = 0.0d; // the sampling probability
      long n = 0L; // the size of the population (total number of items)

      DataBag selected = _BAG_FACTORY.newDefaultBag();
      DataBag waiting = _BAG_FACTORY.newSortedBag(ScoredTupleComparator.getInstance());

      for (Tuple tuple : bag)
      {
        if (first)
        {
          p = (Double) tuple.get(0);
          first = false;
        }

        n += (Long) tuple.get(1);
        selected.addAll((DataBag) tuple.get(3));
        waiting.addAll((DataBag) tuple.get(4));
      }

      long numSelected = selected.size();
      long numWaiting = waiting.size();

      long s = (long) Math.ceil(p * n); // sample size

      System.out.println("To sample " + s + " items from " + n + ", we pre-selected "
          + numSelected + ", and waitlisted " + waiting.size() + ".");

      long numNeeded = s - selected.size();

      if (numNeeded < 0)
      {
        System.err.println("Pre-selected " + numSelected + " items, but only needed " + s
            + ".");
      }

      for (Tuple scored : waiting)
      {
        if (numNeeded <= 0)
        {
          break;
        }
        selected.add(ScoredTuple.fromIntermediateTuple(scored).getTuple());
        numNeeded--;
      }

      if (numNeeded > 0)
      {
        System.err.println("The waiting list only has " + numWaiting
            + " items, but needed " + numNeeded + " more.");
      }

      return selected;
    }