in src/java/org/apache/nutch/indexer/IndexingJob.java [309:425]
public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception {
boolean noCommit = false;
boolean deleteGone = false;
boolean filter = false;
boolean normalize = false;
boolean isSegment = false;
boolean addBinaryContent = false;
boolean base64 = false;
String params= null;
Configuration conf = getConf();
Path crawlDb;
if(args.containsKey(Nutch.ARG_CRAWLDB)) {
Object crawldbPath = args.get(Nutch.ARG_CRAWLDB);
if(crawldbPath instanceof Path) {
crawlDb = (Path) crawldbPath;
}
else {
crawlDb = new Path(crawldbPath.toString());
}
}
else {
crawlDb = new Path(crawlId+"/crawldb");
}
Path linkdb = null;
List<Path> segments = new ArrayList<>();
if(args.containsKey(Nutch.ARG_LINKDB)){
Object path = args.get(Nutch.ARG_LINKDB);
if(path instanceof Path) {
linkdb = (Path) path;
}
else {
linkdb = new Path(path.toString());
}
} else {
linkdb = new Path(crawlId+"/linkdb");
}
if(args.containsKey(Nutch.ARG_SEGMENTDIR)){
isSegment = true;
Path segmentsDir;
Object segDir = args.get(Nutch.ARG_SEGMENTDIR);
if(segDir instanceof Path) {
segmentsDir = (Path) segDir;
}
else {
segmentsDir = new Path(segDir.toString());
}
FileSystem fs = segmentsDir.getFileSystem(getConf());
FileStatus[] fstats = fs.listStatus(segmentsDir,
HadoopFSUtil.getPassDirectoriesFilter(fs));
Path[] files = HadoopFSUtil.getPaths(fstats);
for (Path p : files) {
if (SegmentChecker.isIndexable(p,fs)) {
segments.add(p);
}
}
}
if(args.containsKey(Nutch.ARG_SEGMENTS)) {
Object segmentsFromArg = args.get(Nutch.ARG_SEGMENTS);
ArrayList<String> segmentList = new ArrayList<String>();
if(segmentsFromArg instanceof ArrayList) {
segmentList = (ArrayList<String>)segmentsFromArg; }
else if(segmentsFromArg instanceof Path){
segmentList.add(segmentsFromArg.toString());
}
for(String segment: segmentList) {
segments.add(new Path(segment));
}
}
if(!isSegment){
String segment_dir = crawlId+"/segments";
File segmentsDir = new File(segment_dir);
File[] segmentsList = segmentsDir.listFiles();
Arrays.sort(segmentsList, (f1, f2) -> {
if(f1.lastModified()>f2.lastModified())
return -1;
else
return 0;
});
Path segment = new Path(segmentsList[0].getPath());
segments.add(segment);
}
if(args.containsKey("noCommit")){
noCommit = true;
}
if(args.containsKey("deleteGone")){
deleteGone = true;
}
if(args.containsKey("normalize")){
normalize = true;
}
if(args.containsKey("filter")){
filter = true;
}
if (args.containsKey("addBinaryContent")) {
addBinaryContent = true;
if (args.containsKey("base64")) {
base64 = true;
}
}
if(args.containsKey("params")){
params = (String)args.get("params");
}
setConf(conf);
index(crawlDb, linkdb, segments, noCommit, deleteGone, params, filter,
normalize, addBinaryContent, base64);
Map<String, Object> results = new HashMap<>();
results.put(Nutch.VAL_RESULT, 0);
return results;
}