in datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/CollapsingMapper.java [237:292]
public void collect(GenericRecord key, GenericRecord value) throws IOException, InterruptedException
{
if (key == null)
{
throw new RuntimeException("key is null");
}
if (value == null)
{
throw new RuntimeException("value is null");
}
// wrap the value with the time so we know what to merge and what to unmerge
long time;
if (lastSplit == getContext().getInputSplit())
{
time = lastTime;
}
else
{
FileSplit currentSplit;
lastSplit = getContext().getInputSplit();
try
{
Method m = getContext().getInputSplit().getClass().getMethod("getInputSplit");
m.setAccessible(true);
currentSplit = (FileSplit)m.invoke(getContext().getInputSplit());
}
catch (SecurityException e)
{
throw new RuntimeException(e);
}
catch (NoSuchMethodException e)
{
throw new RuntimeException(e);
}
catch (IllegalArgumentException e)
{
throw new RuntimeException(e);
}
catch (IllegalAccessException e)
{
throw new RuntimeException(e);
}
catch (InvocationTargetException e)
{
throw new RuntimeException(e);
}
time = PathUtils.getDateForNestedDatedPath((currentSplit).getPath().getParent()).getTime();
lastTime = time;
}
wrappedValue.put("time", time);
wrappedValue.put("value", value);
getContext().write(new AvroKey<GenericRecord>(key),new AvroValue<GenericRecord>(wrappedValue));
}