in datafu-hourglass/src/main/java/datafu/hourglass/jobs/StagedOutputJob.java [232:297]
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException
{
final Path actualOutputPath = FileOutputFormat.getOutputPath(this);
final Path stagedPath = new Path(String.format("%s/%s/staged", _stagingPrefix, System.currentTimeMillis()));
FileOutputFormat.setOutputPath(
this,
stagedPath
);
final Thread hook = new Thread(new Runnable()
{
@Override
public void run()
{
try
{
killJob();
}
catch (IOException e)
{
e.printStackTrace();
}
}
});
Runtime.getRuntime().addShutdownHook(hook);
final boolean retVal = super.waitForCompletion(verbose);
Runtime.getRuntime().removeShutdownHook(hook);
if (retVal)
{
FileSystem fs = actualOutputPath.getFileSystem(getConfiguration());
fs.mkdirs(actualOutputPath);
_log.info(String.format("Deleting data at old path[%s]", actualOutputPath));
fs.delete(actualOutputPath, true);
_log.info(String.format("Moving from staged path[%s] to final resting place[%s]", stagedPath, actualOutputPath));
boolean renamed = fs.rename(stagedPath, actualOutputPath);
if (renamed && _writeCounters)
{
writeCounters(fs);
}
return renamed;
}
else
{
FileSystem fs = actualOutputPath.getFileSystem(getConfiguration());
_log.info(String.format("Job failed, deleting staged path[%s]", stagedPath));
try
{
fs.delete(stagedPath, true);
}
catch (IOException e)
{
}
}
_log.warn("retVal was false for some reason...");
return retVal;
}