public void reduce()

in datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/PartitioningReducer.java [66:112]


  public void reduce(Object keyObj,
                     Iterable<Object> values,
                     ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
  {  
    Accumulator<GenericRecord,GenericRecord> acc = getAccumulator();
    
    if (acc == null)
    {
      throw new RuntimeException("No accumulator set for reducer!");
    }
    
    acc.cleanup();
            
    Long keyTime = null;
    
    GenericRecord key = ((AvroKey<GenericRecord>)keyObj).datum();
    
    keyTime = (Long)key.get("time");
    key = (GenericRecord)key.get("value");
    
    long accumulatedCount = 0;    
    for (Object valueObj : values)
    {       
      GenericRecord value = ((AvroValue<GenericRecord>)valueObj).datum(); 
      acc.accumulate(value);      
      accumulatedCount++;      
    }
    
    if (accumulatedCount > 0)
    {
      GenericRecord outputValue = acc.getFinal();               
      if (outputValue != null)
      {                    
        GenericRecord output = new GenericData.Record(getSchemas().getReduceOutputSchema());
        output.put("key", key);
        output.put("value", outputValue);
        
        // write output in directories corresponding to each day
        String namedOutput = getNamedOutput(keyTime);
        if (_multipleOutputs == null)
        {
          throw new RuntimeException("No multiple outputs set");
        }
        _multipleOutputs.write(namedOutput, new AvroKey<GenericRecord>(output), (AvroValue<GenericRecord>)null);
      }
    }
  }