public void collect()

in datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingMapper.java [237:292]


    public void collect(GenericRecord key, GenericRecord value) throws IOException, InterruptedException
    {                
      if (key == null)
      {
        throw new RuntimeException("key is null");
      }
      if (value == null)
      {
        throw new RuntimeException("value is null");
      }
      
      // wrap the value with the time so we know what to merge and what to unmerge        
      long time;        
      if (lastSplit == getContext().getInputSplit())
      {
        time = lastTime;
      }
      else
      {
        FileSplit currentSplit;
        lastSplit = getContext().getInputSplit();
        try
        {
          Method m = getContext().getInputSplit().getClass().getMethod("getInputSplit");
          m.setAccessible(true);
          currentSplit = (FileSplit)m.invoke(getContext().getInputSplit());
        }
        catch (SecurityException e)
        {
          throw new RuntimeException(e);
        }
        catch (NoSuchMethodException e)
        {
          throw new RuntimeException(e);
        }
        catch (IllegalArgumentException e)
        {
          throw new RuntimeException(e);
        }
        catch (IllegalAccessException e)
        {
          throw new RuntimeException(e);
        }
        catch (InvocationTargetException e)
        {
          throw new RuntimeException(e);
        }
        time = PathUtils.getDateForNestedDatedPath((currentSplit).getPath().getParent()).getTime();
        lastTime = time;
      }
              
      wrappedValue.put("time", time);
      wrappedValue.put("value", value);
      
      getContext().write(new AvroKey<GenericRecord>(key),new AvroValue<GenericRecord>(wrappedValue));
    }