in datafu-pig/src/main/java/datafu/pig/sampling/WeightedSample.java [81:152]
public DataBag exec(Tuple input) throws IOException {
DataBag output = bagFactory.newDefaultBag();
DataBag samples = (DataBag) input.get(0);
if (samples == null || samples.size() == 0) {
return output; // if we are given null we will return an empty bag
}
int numSamples = (int) samples.size();
if (numSamples == 1) return samples;
Tuple[] tuples = new Tuple[numSamples];
int tupleIndex = 0;
for (Tuple tuple : samples) {
tuples[tupleIndex] = tuple;
tupleIndex++;
}
double[] scores = new double[numSamples];
int scoreIndex = ((Number)input.get(1)).intValue();
tupleIndex = 0;
for (Tuple tuple : samples) {
double score = ((Number)tuple.get(scoreIndex)).doubleValue();
score = Math.max(score, Double.MIN_NORMAL); // negative scores cause problems
scores[tupleIndex] = score;
tupleIndex++;
}
// accept any type of number for sample size, but convert to int
int limitSamples = numSamples;
if (input.size() == 3) {
// sample limit included
limitSamples = Math.min(((Number)input.get(2)).intValue(), numSamples);
}
/*
* Here's how the algorithm works:
*
* 1. Create a cumulative distribution of the scores 2. Draw a random number 3. Find
* the interval in which the drawn number falls into 4. Select the element
* encompassing that interval 5. Remove the selected element from consideration 6.
* Repeat 1-5 k times
*
* However, rather than removing the element (#5), which is expensive for an array,
* this function performs some extra bookkeeping by replacing the selected element
* with an element from the front of the array and truncating the front. This
* complicates matters as the element positions have changed, so another mapping for
* positions is needed.
*
* This is an O(k*n) algorithm, where k is the number of elements to sample and n is
* the number of scores.
*/
Random rng = null;
if (seed == null) {
rng = new Random();
} else {
rng = new Random(seed);
}
for (int k = 0; k < limitSamples; k++) {
double val = rng.nextDouble();
int idx = find_cumsum_interval(scores, val, k, numSamples);
if (idx == numSamples)
idx = rng.nextInt(numSamples - k) + k;
output.add(tuples[idx]);
scores[idx] = scores[k];
tuples[idx] = tuples[k];
}
return output;
}