public void reduce()

in datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingCombiner.java [59:141]


  public void reduce(Object keyObj,
                      Iterable<Object> values,
                      ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
  {
    Accumulator<GenericRecord,GenericRecord> acc = getAccumulator();
    
    if (acc == null)
    {
      throw new RuntimeException("No combiner factory set");
    }
    
    long accumulatedCount = 0;
    
    acc.cleanup();
    
    for (Object valueObj : values)
    {       
      GenericRecord value = ((AvroValue<GenericRecord>)valueObj).datum();
      if (value.getSchema().getFullName().equals(getSchemas().getIntermediateValueSchema().getFullName()))
      {        
        acc.accumulate(value);
        accumulatedCount++;
      }
      else if (value.getSchema().getFullName().equals(getSchemas().getDatedIntermediateValueSchema().getFullName()))
      {          
        if (!_reusePreviousOutput)
        {
          throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName()); 
        }
        
        Long time = (Long)value.get("time");
        GenericRecord data = (GenericData.Record)value.get("value");
        
        if (time == null)
        {
          throw new RuntimeException("time is null");
        }
        
        if (data == null)
        {
          throw new RuntimeException("value is null");
        }
        
        if (time >= _beginTime && time <= _endTime)
        {
          acc.accumulate(data);
          accumulatedCount++;
        }
        else if (time < _beginTime)
        {
          // pass through unchanged, reducer will handle it
          context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(value));
        }
        else
        {
          throw new RuntimeException(String.format("Time %d is greater than end time %d",time,_endTime));
        }
      }
      else if (value.getSchema().getFullName().equals(getSchemas().getOutputValueSchema().getFullName()))
      {   
        if (!_reusePreviousOutput)
        {
          throw new RuntimeException("Did not expect " + getSchemas().getOutputValueSchema().getFullName()); 
        }
                
        // pass through unchanged, reducer will handle it
        context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(value));
      }
      else
      {
        throw new RuntimeException("Unexpected type: " + value.getSchema().getFullName());
      }      
    }
    
    if (accumulatedCount > 0)
    {
      GenericRecord intermediateValue = acc.getFinal();
      if (intermediateValue != null)
      {
        context.write((AvroKey<GenericRecord>)keyObj,new AvroValue<GenericRecord>(intermediateValue));
      }
    }
  }