private void moveStagedFiles()

in datafu-hourglass/src/main/java/datafu/hourglass/jobs/AbstractPartitionPreservingIncrementalJob.java [553:636]


  private void moveStagedFiles(Report report, Path sourcePath) throws IOException
  {
    _log.info("Following files produced in staging path:");
    for (FileStatus stat : getFileSystem().globStatus(new Path(sourcePath,"*.avro")))
    {
      _log.info(String.format("* %s (%d bytes)",stat.getPath(),stat.getLen()));
    }
    
    FileStatus[] incrementalParts = getFileSystem().globStatus(new Path(sourcePath,"*"), new PathFilter() {
      @Override
      public boolean accept(Path path)
      {
        String[] pathParts = path.getName().split("-");
        try
        {
          Long.parseLong(pathParts[0]);
          return true;
        }
        catch (NumberFormatException e)
        {
          return false;
        }
      }
    });
    
    // collect the new incremental data from the temp folder and move to subfolders
    Map<String,Path> incrementalTargetPaths = new HashMap<String,Path>();
    for (FileStatus stat : incrementalParts)
    {        
      String[] pathParts = stat.getPath().getName().split("-");
      try
      {        
        String timestamp = pathParts[0];
        
        if (!incrementalTargetPaths.containsKey(timestamp))
        {
          Path parent = new Path(sourcePath,timestamp);
          
          if (!getFileSystem().exists(parent))
          {
            getFileSystem().mkdirs(parent);
          }
          else
          {
            throw new RuntimeException("already exists: " + parent.toString());
          }
          
          incrementalTargetPaths.put(timestamp,parent);
        }
        
        Path parent = incrementalTargetPaths.get(timestamp);
        _log.info(String.format("Moving %s to %s",stat.getPath().getName(),parent.toString()));
        getFileSystem().rename(stat.getPath(), new Path(parent,stat.getPath().getName()));
      }
      catch (NumberFormatException e)
      {
        throw new RuntimeException(e);
      }
    }
    
    for (Path src : incrementalTargetPaths.values())
    {
      Date srcDate;
      try
      {
        srcDate = PathUtils.datedPathFormat.parse(src.getName());
      }
      catch (ParseException e)
      {
        throw new RuntimeException(e);
      }
      Path target = new Path(getOutputPath(),PathUtils.nestedDatedPathFormat.format(srcDate));
      _log.info(String.format("Moving %s to %s",src.getName(),target));
      
      getFileSystem().mkdirs(target.getParent());
      
      if (!getFileSystem().rename(src, target))
      {
        throw new RuntimeException("Failed to rename " + src + " to " + target);
      }
      
      report.outputFiles.add(new DatePath(srcDate,target));
    }
  }