in src/java/org/apache/nutch/segment/SegmentReader.java [220:319]
public void dump(Path segment, Path output) throws IOException,
InterruptedException, ClassNotFoundException {
LOG.info("SegmentReader: dump segment: {}", segment);
Job job = Job.getInstance(getConf(), "Nutch SegmentReader: " + segment);
Configuration conf = job.getConfiguration();
List<Path> inputDirs = new ArrayList<>();
if (ge) {
addSegmSubDirIfExists(inputDirs, conf, segment,
CrawlDatum.GENERATE_DIR_NAME);
}
if (fe) {
addSegmSubDirIfExists(inputDirs, conf, segment,
CrawlDatum.FETCH_DIR_NAME);
}
if (pa) {
addSegmSubDirIfExists(inputDirs, conf, segment,
CrawlDatum.PARSE_DIR_NAME);
}
if (co) {
addSegmSubDirIfExists(inputDirs, conf, segment, Content.DIR_NAME);
}
if (pd) {
addSegmSubDirIfExists(inputDirs, conf, segment, ParseData.DIR_NAME);
}
if (pt) {
addSegmSubDirIfExists(inputDirs, conf, segment, ParseText.DIR_NAME);
}
if (inputDirs.isEmpty()) {
String msg = "No segment subdirectories defined as input";
LOG.error(msg);
throw new RuntimeException(msg);
}
for (Path p : inputDirs) {
FileInputFormat.addInputPath(job, p);
}
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(InputCompatMapper.class);
job.setReducerClass(InputCompatReducer.class);
job.setJarByClass(SegmentReader.class);
Path tempDir = new Path(conf.get("hadoop.tmp.dir", "/tmp") + "/segread-"
+ RANDOM.nextInt());
FileSystem fs = tempDir.getFileSystem(conf);
fs.delete(tempDir, true);
FileOutputFormat.setOutputPath(job, tempDir);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NutchWritable.class);
try {
boolean success = job.waitForCompletion(true);
if (!success) {
String message = NutchJob.getJobFailureLogMessage("SegmentReader", job);
LOG.error(message);
throw new RuntimeException(message);
}
} catch (IOException | InterruptedException | ClassNotFoundException e ){
LOG.error(StringUtils.stringifyException(e));
throw e;
}
// concatenate the output
Path dumpFile = new Path(output, conf.get("segment.dump.dir", "dump"));
FileSystem outFs = dumpFile.getFileSystem(conf);
// remove the old file
outFs.delete(dumpFile, true);
FileStatus[] fstats = fs.listStatus(tempDir,
HadoopFSUtil.getPassAllFilter());
Path[] files = HadoopFSUtil.getPaths(fstats);
int currentRecordNumber = 0;
if (files.length > 0) {
try (PrintWriter writer = new PrintWriter(
new BufferedWriter(new OutputStreamWriter(outFs.create(dumpFile),
StandardCharsets.UTF_8)))) {
for (int i = 0; i < files.length; i++) {
Path partFile = files[i];
try {
currentRecordNumber = append(fs, conf, partFile, writer,
currentRecordNumber);
} catch (IOException exception) {
if (LOG.isWarnEnabled()) {
LOG.warn("Couldn't copy the content of " + partFile.toString()
+ " into " + dumpFile.toString());
LOG.warn(exception.getMessage());
}
}
}
}
}
fs.delete(tempDir, true);
LOG.info("SegmentReader: done");
}