in datafu-hourglass/src/main/java/datafu/hourglass/jobs/AbstractPartitionPreservingIncrementalJob.java [553:636]
private void moveStagedFiles(Report report, Path sourcePath) throws IOException
{
_log.info("Following files produced in staging path:");
for (FileStatus stat : getFileSystem().globStatus(new Path(sourcePath,"*.avro")))
{
_log.info(String.format("* %s (%d bytes)",stat.getPath(),stat.getLen()));
}
FileStatus[] incrementalParts = getFileSystem().globStatus(new Path(sourcePath,"*"), new PathFilter() {
@Override
public boolean accept(Path path)
{
String[] pathParts = path.getName().split("-");
try
{
Long.parseLong(pathParts[0]);
return true;
}
catch (NumberFormatException e)
{
return false;
}
}
});
// collect the new incremental data from the temp folder and move to subfolders
Map<String,Path> incrementalTargetPaths = new HashMap<String,Path>();
for (FileStatus stat : incrementalParts)
{
String[] pathParts = stat.getPath().getName().split("-");
try
{
String timestamp = pathParts[0];
if (!incrementalTargetPaths.containsKey(timestamp))
{
Path parent = new Path(sourcePath,timestamp);
if (!getFileSystem().exists(parent))
{
getFileSystem().mkdirs(parent);
}
else
{
throw new RuntimeException("already exists: " + parent.toString());
}
incrementalTargetPaths.put(timestamp,parent);
}
Path parent = incrementalTargetPaths.get(timestamp);
_log.info(String.format("Moving %s to %s",stat.getPath().getName(),parent.toString()));
getFileSystem().rename(stat.getPath(), new Path(parent,stat.getPath().getName()));
}
catch (NumberFormatException e)
{
throw new RuntimeException(e);
}
}
for (Path src : incrementalTargetPaths.values())
{
Date srcDate;
try
{
srcDate = PathUtils.datedPathFormat.parse(src.getName());
}
catch (ParseException e)
{
throw new RuntimeException(e);
}
Path target = new Path(getOutputPath(),PathUtils.nestedDatedPathFormat.format(srcDate));
_log.info(String.format("Moving %s to %s",src.getName(),target));
getFileSystem().mkdirs(target.getParent());
if (!getFileSystem().rename(src, target))
{
throw new RuntimeException("Failed to rename " + src + " to " + target);
}
report.outputFiles.add(new DatePath(srcDate,target));
}
}