public void reduce()

in datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingReducer.java [64:210]


  public void reduce(Object keyObj,
                     Iterable<Object> values,
                     ReduceContext<Object,Object,Object,Object> context) throws IOException, InterruptedException
  {
    if (_newAccumulator == null)
    {
      throw new RuntimeException("No reducer set");
    }
    
    GenericRecord key = ((AvroKey<GenericRecord>)keyObj).datum();
    
    // used when processing all data (i.e. no window size)      
    Accumulator<GenericRecord,GenericRecord> acc = getNewAccumulator();
    acc.cleanup();
    long accumulatedCount = 0;
    
    Accumulator<GenericRecord,GenericRecord> accOld = null;
    long oldAccumulatedCount = 0;
    if (getReuseOutput())
    {
      accOld = getOldAccumulator();
      accOld.cleanup();
    }
    
    GenericRecord previous = null;
    
    for (Object valueObj : values)
    {       
      GenericRecord value = ((AvroValue<GenericRecord>)valueObj).datum();
      if (value.getSchema().getFullName().equals(getSchemas().getIntermediateValueSchema().getFullName()))
      {        
        acc.accumulate(value);
        accumulatedCount++;
      }
      else if (value.getSchema().getFullName().equals(getSchemas().getDatedIntermediateValueSchema().getFullName()))
      {          
        if (!_reusePreviousOutput)
        {
          throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName()); 
        }
        
        Long time = (Long)value.get("time");
        GenericRecord data = (GenericData.Record)value.get("value");
        
        if (time == null)
        {
          throw new RuntimeException("time is null");
        }
        
        if (data == null)
        {
          throw new RuntimeException("value is null");
        }
        
        if (time >= _beginTime && time <= _endTime)
        {
          acc.accumulate(data);
          accumulatedCount++;
        }
        else if (time < _beginTime)
        {
          accOld.accumulate(data);
          oldAccumulatedCount++;
        }
        else
        {
          throw new RuntimeException(String.format("Time %d is greater than end time %d",time,_endTime));
        }
      }
      else if (value.getSchema().getFullName().equals(getSchemas().getOutputValueSchema().getFullName()))
      {   
        if (!_reusePreviousOutput)
        {
          throw new RuntimeException("Did not expect " + getSchemas().getDatedIntermediateValueSchema().getFullName()); 
        }
        
        // deep clone the previous output fed back in
        previous = new GenericData.Record((Record)value,true);
      }
      else
      {
        throw new RuntimeException("Unexpected type: " + value.getSchema().getFullName());
      }      
    }
            
    GenericRecord newOutputValue = null;
    GenericRecord oldOutputValue = null;
    
    if (accumulatedCount > 0)
    {
      newOutputValue = acc.getFinal();
    }
    
    if (oldAccumulatedCount > 0)
    {
      oldOutputValue = accOld.getFinal();
    }
    
    GenericRecord outputValue = null;
    
    if (previous == null)
    {
      outputValue = newOutputValue;
      
      if (oldOutputValue != null)
      {
        if (_oldMerger == null)
        {
          throw new RuntimeException("No old record merger set");
        }
        
        outputValue = _oldMerger.merge(outputValue, oldOutputValue);
      }
    }
    else
    {
      outputValue = previous;
      
      if (oldOutputValue != null)
      {
        if (_oldMerger == null)
        {
          throw new RuntimeException("No old record merger set");
        }
        
        outputValue = _oldMerger.merge(outputValue, oldOutputValue);
      }
      
      if (newOutputValue != null)
      {
        if (_merger == null)
        {
          throw new RuntimeException("No new record merger set");
        }
        
        outputValue = _merger.merge(outputValue, newOutputValue);
      }
    }
    
    if (outputValue != null)
    {
      GenericRecord output = new GenericData.Record(getSchemas().getReduceOutputSchema());
      output.put("key", key);
      output.put("value", outputValue);
      context.write(new AvroKey<GenericRecord>(output),null);
    }
  }