in datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingReducer.java [64:210]
public void reduce(Object keyObj,
Iterable<Object> values,
ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
{
if (_newAccumulator == null)
{
throw new RuntimeException("No reducer set");
}
GenericRecord key = ((AvroKey<GenericRecord>)keyObj).datum();
// used when processing all data (i.e. no window size)
Accumulator<GenericRecord,GenericRecord> acc = getNewAccumulator();
acc.cleanup();
long accumulatedCount = 0;
Accumulator<GenericRecord,GenericRecord> accOld = null;
long oldAccumulatedCount = 0;
if (getReuseOutput())
{
accOld = getOldAccumulator();
accOld.cleanup();
}
GenericRecord previous = null;
for (Object valueObj : values)
{
GenericRecord value = ((AvroValue<GenericRecord>)valueObj).datum();
if (value.getSchema().getFullName().equals(getSchemas().getIntermediateValueSchema().getFullName()))
{
acc.accumulate(value);
accumulatedCount++;
}
else if (value.getSchema().getFullName().equals(getSchemas().getDatedIntermediateValueSchema().getFullName()))
{
if (!_reusePreviousOutput)
{
throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName());
}
Long time = (Long)value.get("time");
GenericRecord data = (GenericData.Record)value.get("value");
if (time == null)
{
throw new RuntimeException("time is null");
}
if (data == null)
{
throw new RuntimeException("value is null");
}
if (time >= _beginTime && time <= _endTime)
{
acc.accumulate(data);
accumulatedCount++;
}
else if (time < _beginTime)
{
accOld.accumulate(data);
oldAccumulatedCount++;
}
else
{
throw new RuntimeException(String.format("Time %d is greater than end time %d",time,_endTime));
}
}
else if (value.getSchema().getFullName().equals(getSchemas().getOutputValueSchema().getFullName()))
{
if (!_reusePreviousOutput)
{
throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName());
}
// deep clone the previous output fed back in
previous = new GenericData.Record((Record)value,true);
}
else
{
throw new RuntimeException("Unexpected type: " + value.getSchema().getFullName());
}
}
GenericRecord newOutputValue = null;
GenericRecord oldOutputValue = null;
if (accumulatedCount > 0)
{
newOutputValue = acc.getFinal();
}
if (oldAccumulatedCount > 0)
{
oldOutputValue = accOld.getFinal();
}
GenericRecord outputValue = null;
if (previous == null)
{
outputValue = newOutputValue;
if (oldOutputValue != null)
{
if (_oldMerger == null)
{
throw new RuntimeException("No old record merger set");
}
outputValue = _oldMerger.merge(outputValue, oldOutputValue);
}
}
else
{
outputValue = previous;
if (oldOutputValue != null)
{
if (_oldMerger == null)
{
throw new RuntimeException("No old record merger set");
}
outputValue = _oldMerger.merge(outputValue, oldOutputValue);
}
if (newOutputValue != null)
{
if (_merger == null)
{
throw new RuntimeException("No new record merger set");
}
outputValue = _merger.merge(outputValue, newOutputValue);
}
}
if (outputValue != null)
{
GenericRecord output = new GenericData.Record(getSchemas().getReduceOutputSchema());
output.put("key", key);
output.put("value", outputValue);
context.write(new AvroKey<GenericRecord>(output),null);
}
}