public DataBag exec()

in datafu-pig/src/main/java/datafu/pig/sampling/WeightedSample.java [81:152]


  public DataBag exec(Tuple input) throws IOException {   
    DataBag output = bagFactory.newDefaultBag();

    DataBag samples = (DataBag) input.get(0);
    if (samples == null || samples.size() == 0) {
      return output; // if we are given null we will return an empty bag
    }
    int numSamples = (int) samples.size();
    if (numSamples == 1) return samples;
       
    Tuple[] tuples = new Tuple[numSamples];
    int tupleIndex = 0;
    for (Tuple tuple : samples) {
      tuples[tupleIndex] = tuple;
      tupleIndex++;
    }

    double[] scores = new double[numSamples];
    int scoreIndex = ((Number)input.get(1)).intValue();
    tupleIndex = 0;
    for (Tuple tuple : samples) {
      double score = ((Number)tuple.get(scoreIndex)).doubleValue();
      score = Math.max(score, Double.MIN_NORMAL); // negative scores cause problems
      scores[tupleIndex] = score;
      tupleIndex++;
    }
    
    // accept any type of number for sample size, but convert to int
    int limitSamples = numSamples;
    if (input.size() == 3) {
      // sample limit included
      limitSamples = Math.min(((Number)input.get(2)).intValue(), numSamples);      
    }

    /*
     * Here's how the algorithm works:
     * 
     * 1. Create a cumulative distribution of the scores 2. Draw a random number 3. Find
     * the interval in which the drawn number falls into 4. Select the element
     * encompassing that interval 5. Remove the selected element from consideration 6.
     * Repeat 1-5 k times
     * 
     * However, rather than removing the element (#5), which is expensive for an array,
     * this function performs some extra bookkeeping by replacing the selected element
     * with an element from the front of the array and truncating the front. This
     * complicates matters as the element positions have changed, so another mapping for
     * positions is needed.
     * 
     * This is an O(k*n) algorithm, where k is the number of elements to sample and n is
     * the number of scores.
     */    
    Random rng = null;    
    if (seed == null) {
      rng = new Random();
    } else {
      rng = new Random(seed);
    }
    
    for (int k = 0; k < limitSamples; k++) {
      double val = rng.nextDouble();
      int idx = find_cumsum_interval(scores, val, k, numSamples);
      if (idx == numSamples)
        idx = rng.nextInt(numSamples - k) + k;

      output.add(tuples[idx]);

      scores[idx] = scores[k];
      tuples[idx] = tuples[k];
    }

    return output;
  }