in datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingCombiner.java [59:141]
public void reduce(Object keyObj,
Iterable<Object> values,
ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
{
Accumulator<GenericRecord,GenericRecord> acc = getAccumulator();
if (acc == null)
{
throw new RuntimeException("No combiner factory set");
}
long accumulatedCount = 0;
acc.cleanup();
for (Object valueObj : values)
{
GenericRecord value = ((AvroValue<GenericRecord>)valueObj).datum();
if (value.getSchema().getFullName().equals(getSchemas().getIntermediateValueSchema().getFullName()))
{
acc.accumulate(value);
accumulatedCount++;
}
else if (value.getSchema().getFullName().equals(getSchemas().getDatedIntermediateValueSchema().getFullName()))
{
if (!_reusePreviousOutput)
{
throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName());
}
Long time = (Long)value.get("time");
GenericRecord data = (GenericData.Record)value.get("value");
if (time == null)
{
throw new RuntimeException("time is null");
}
if (data == null)
{
throw new RuntimeException("value is null");
}
if (time >= _beginTime && time <= _endTime)
{
acc.accumulate(data);
accumulatedCount++;
}
else if (time < _beginTime)
{
// pass through unchanged, reducer will handle it
context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(value));
}
else
{
throw new RuntimeException(String.format("Time %d is greater than end time %d",time,_endTime));
}
}
else if (value.getSchema().getFullName().equals(getSchemas().getOutputValueSchema().getFullName()))
{
if (!_reusePreviousOutput)
{
throw new RuntimeException("Did not expect " + getSchemas().getOutputValueSchema().getFullName());
}
// pass through unchanged, reducer will handle it
context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(value));
}
else
{
throw new RuntimeException("Unexpected type: " + value.getSchema().getFullName());
}
}
if (accumulatedCount > 0)
{
GenericRecord intermediateValue = acc.getFinal();
if (intermediateValue != null)
{
context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(intermediateValue));
}
}
}