public Tuple exec()

in datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSample.java [196:299]


    public Tuple exec(Tuple input) throws IOException
    {
      int numArgs = input.size();
      
      // The first if clause is for backward compatibility, which should be removed 
      // after we remove specifying sampling probability in the constructor.
      if(numArgs == 1)
      {
        if(_p < 0.0d)
        {
          throw new IllegalArgumentException("Sampling probability is not given.");
        }
      }
      else if (numArgs < 2 || numArgs > 3)
      {
        throw new IllegalArgumentException("The input tuple should have either two or three fields: "
            + "a bag of items, the sampling probability, "
            + "and optionally a good lower bound of the size of the population or the exact number.");
      }

      DataBag items = (DataBag) input.get(0);
      long numItems = items.size();
      _localCount += numItems;

      // This is also for backward compatibility. Should change to
      // double p = ((Number) input.get(1)).doubleValue();
      // after we remove specifying sampling probability in the constructor.
      double p = numArgs == 1 ? _p : ((Number) input.get(1)).doubleValue();
      if (_first)
      {
        _p = p;
        verifySamplingProbability(p);
      }
      else
      {
        if (p != _p)
        {
          throw new IllegalArgumentException("The sampling probability must be a scalar, but found two different values: "
              + _p + " and " + p + ".");
        }
      }

      long n1 = 0L;
      if (numArgs > 2)
      {
        n1 = ((Number) input.get(2)).longValue();

        if (_first)
        {
          _n1 = n1;
        }
        else
        {
          if (n1 != _n1)
          {
            throw new IllegalArgumentException("The lower bound of the population size must be a scalar, but found two different values: "
                + _n1 + " and " + n1 + ".");
          }
        }
      }

      _first = false;

      // Use the local count if the input lower bound is smaller.
      n1 = Math.max(n1, _localCount);

      DataBag selected = _BAG_FACTORY.newDefaultBag();
      DataBag waiting = _BAG_FACTORY.newDefaultBag();

      if (n1 > 0L)
      {
        double q1 = getQ1(n1, p);
        double q2 = getQ2(n1, p);

        for (Tuple t : items)
        {
          double x = nextDouble();
          if (x < q1)
          {
            selected.add(t);
          }
          else if (x < q2)
          {
            waiting.add(new ScoredTuple(x, t).getIntermediateTuple(_TUPLE_FACTORY));
          }
        }
      }

      /*
       * The output tuple contains the following fields: sampling probability (double),
       * number of processed items in this tuple (long), a good lower bound of the size of
       * the population or the exact number (long), a bag of selected items (bag), and a
       * bag of waitlisted items with scores (bag).
       */
      Tuple output = _TUPLE_FACTORY.newTuple();

      output.append(p);
      output.append(numItems);
      output.append(n1);
      output.append(selected);
      output.append(waiting);

      return output;
    }