in datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/PartitioningReducer.java [66:112]
public void reduce(Object keyObj,
Iterable<Object> values,
ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
{
Accumulator<GenericRecord,GenericRecord> acc = getAccumulator();
if (acc == null)
{
throw new RuntimeException("No accumulator set for reducer!");
}
acc.cleanup();
Long keyTime = null;
GenericRecord key = ((AvroKey<GenericRecord>)keyObj).datum();
keyTime = (Long)key.get("time");
key = (GenericRecord)key.get("value");
long accumulatedCount = 0;
for (Object valueObj : values)
{
GenericRecord value = ((AvroValue<GenericRecord>)valueObj).datum();
acc.accumulate(value);
accumulatedCount++;
}
if (accumulatedCount > 0)
{
GenericRecord outputValue = acc.getFinal();
if (outputValue != null)
{
GenericRecord output = new GenericData.Record(getSchemas().getReduceOutputSchema());
output.put("key", key);
output.put("value", outputValue);
// write output in directories corresponding to each day
String namedOutput = getNamedOutput(keyTime);
if (_multipleOutputs == null)
{
throw new RuntimeException("No multiple outputs set");
}
_multipleOutputs.write(namedOutput, new AvroKey<GenericRecord>(output), (AvroValue<GenericRecord>)null);
}
}
}