public Map run()

in src/java/org/apache/nutch/indexer/IndexingJob.java [309:425]


  public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception {
    boolean noCommit = false;
    boolean deleteGone = false; 
    boolean filter = false;
    boolean normalize = false;
    boolean isSegment = false;
    boolean addBinaryContent = false;
    boolean base64 = false;
    String params= null;
    Configuration conf = getConf();

    Path crawlDb;
    if(args.containsKey(Nutch.ARG_CRAWLDB)) {
      Object crawldbPath = args.get(Nutch.ARG_CRAWLDB);
      if(crawldbPath instanceof Path) {
        crawlDb = (Path) crawldbPath;
      }
      else {
        crawlDb = new Path(crawldbPath.toString());
      }
    }
    else {
      crawlDb = new Path(crawlId+"/crawldb");
    }

    Path linkdb = null;
    List<Path> segments = new ArrayList<>();

    if(args.containsKey(Nutch.ARG_LINKDB)){
        Object path = args.get(Nutch.ARG_LINKDB);
        if(path instanceof Path) {
          linkdb = (Path) path;
        }
        else {
          linkdb = new Path(path.toString());
        }
    } else {
        linkdb = new Path(crawlId+"/linkdb");
      }

    if(args.containsKey(Nutch.ARG_SEGMENTDIR)){
      isSegment = true;
      Path segmentsDir;
      Object segDir = args.get(Nutch.ARG_SEGMENTDIR);
      if(segDir instanceof Path) {
        segmentsDir = (Path) segDir;
      }
      else {
        segmentsDir = new Path(segDir.toString());
      }
      FileSystem fs = segmentsDir.getFileSystem(getConf());
      FileStatus[] fstats = fs.listStatus(segmentsDir,
          HadoopFSUtil.getPassDirectoriesFilter(fs));
      Path[] files = HadoopFSUtil.getPaths(fstats);
      for (Path p : files) {
        if (SegmentChecker.isIndexable(p,fs)) {
          segments.add(p);
        }
      }     
    }

    if(args.containsKey(Nutch.ARG_SEGMENTS)) {
      Object segmentsFromArg = args.get(Nutch.ARG_SEGMENTS);
      ArrayList<String> segmentList = new ArrayList<String>();
      if(segmentsFromArg instanceof ArrayList) {
    	  segmentList = (ArrayList<String>)segmentsFromArg; }
      else if(segmentsFromArg instanceof Path){
        segmentList.add(segmentsFromArg.toString());
      }

      for(String segment: segmentList) {
    	  segments.add(new Path(segment));
      }
    }

    if(!isSegment){
      String segment_dir = crawlId+"/segments";
      File segmentsDir = new File(segment_dir);
      File[] segmentsList = segmentsDir.listFiles();  
      Arrays.sort(segmentsList, (f1, f2) -> {
        if(f1.lastModified()>f2.lastModified())
          return -1;
        else
          return 0;
      });
      Path segment = new Path(segmentsList[0].getPath());
      segments.add(segment);
    }

    if(args.containsKey("noCommit")){
      noCommit = true;
    }
    if(args.containsKey("deleteGone")){
      deleteGone = true;
    }
    if(args.containsKey("normalize")){
      normalize = true;
    }
    if(args.containsKey("filter")){
      filter = true;
    }
    if (args.containsKey("addBinaryContent")) {
      addBinaryContent = true;
      if (args.containsKey("base64")) {
          base64 = true;
      }
    }
    if(args.containsKey("params")){
      params = (String)args.get("params");
    }
    setConf(conf);
    index(crawlDb, linkdb, segments, noCommit, deleteGone, params, filter,
        normalize, addBinaryContent, base64);
    Map<String, Object> results = new HashMap<>();
    results.put(Nutch.VAL_RESULT, 0);
    return results;
  }