in src/java/org/apache/nutch/segment/SegmentMerger.java [213:352]
public RecordWriter<Text, MetaWrapper> getRecordWriter(TaskAttemptContext context)
throws IOException {
Configuration conf = context.getConfiguration();
String name = getUniqueFile(context, "part", "");
Path dir = FileOutputFormat.getOutputPath(context);
FileSystem fs = dir.getFileSystem(context.getConfiguration());
return new RecordWriter<Text, MetaWrapper>() {
MapFile.Writer cOut = null;
MapFile.Writer fOut = null;
MapFile.Writer pdOut = null;
MapFile.Writer ptOut = null;
SequenceFile.Writer gOut = null;
SequenceFile.Writer pOut = null;
HashMap<String, Closeable> sliceWriters = new HashMap<>();
String segmentName = conf.get("segment.merger.segmentName");
@Override
public void write(Text key, MetaWrapper wrapper) throws IOException {
// unwrap
SegmentPart sp = SegmentPart.parse(wrapper.getMeta(SEGMENT_PART_KEY));
Writable o = wrapper.get();
String slice = wrapper.getMeta(SEGMENT_SLICE_KEY);
if (o instanceof CrawlDatum) {
if (sp.partName.equals(CrawlDatum.GENERATE_DIR_NAME)) {
gOut = ensureSequenceFile(slice, CrawlDatum.GENERATE_DIR_NAME);
gOut.append(key, o);
} else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) {
fOut = ensureMapFile(slice, CrawlDatum.FETCH_DIR_NAME,
CrawlDatum.class);
fOut.append(key, o);
} else if (sp.partName.equals(CrawlDatum.PARSE_DIR_NAME)) {
pOut = ensureSequenceFile(slice, CrawlDatum.PARSE_DIR_NAME);
pOut.append(key, o);
} else {
throw new IOException("Cannot determine segment part: "
+ sp.partName);
}
} else if (o instanceof Content) {
cOut = ensureMapFile(slice, Content.DIR_NAME, Content.class);
cOut.append(key, o);
} else if (o instanceof ParseData) {
// update the segment name inside contentMeta - required by Indexer
if (slice == null) {
((ParseData) o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
segmentName);
} else {
((ParseData) o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
segmentName + "-" + slice);
}
pdOut = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class);
pdOut.append(key, o);
} else if (o instanceof ParseText) {
ptOut = ensureMapFile(slice, ParseText.DIR_NAME, ParseText.class);
ptOut.append(key, o);
}
}
// lazily create SequenceFile-s.
private SequenceFile.Writer ensureSequenceFile(String slice,
String dirName) throws IOException {
if (slice == null)
slice = DEFAULT_SLICE;
SequenceFile.Writer res = (SequenceFile.Writer) sliceWriters
.get(slice + dirName);
if (res != null)
return res;
Path wname;
Path out = FileOutputFormat.getOutputPath(context);
if (slice == DEFAULT_SLICE) {
wname = new Path(new Path(new Path(out, segmentName), dirName),
name);
} else {
wname = new Path(new Path(new Path(out, segmentName + "-" + slice),
dirName), name);
}
res = SequenceFile.createWriter(conf, SequenceFile.Writer.file(wname),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(CrawlDatum.class),
SequenceFile.Writer.bufferSize(fs.getConf().getInt("io.file.buffer.size",4096)),
SequenceFile.Writer.replication(fs.getDefaultReplication(wname)),
SequenceFile.Writer.blockSize(1073741824),
SequenceFile.Writer.compression(SequenceFileOutputFormat.getOutputCompressionType(context), new DefaultCodec()),
SequenceFile.Writer.progressable((Progressable)context),
SequenceFile.Writer.metadata(new Metadata()));
sliceWriters.put(slice + dirName, res);
return res;
}
// lazily create MapFile-s.
private MapFile.Writer ensureMapFile(String slice, String dirName,
Class<? extends Writable> clazz) throws IOException {
if (slice == null)
slice = DEFAULT_SLICE;
MapFile.Writer res = (MapFile.Writer) sliceWriters.get(slice
+ dirName);
if (res != null)
return res;
Path wname;
Path out = FileOutputFormat.getOutputPath(context);
if (slice == DEFAULT_SLICE) {
wname = new Path(new Path(new Path(out, segmentName), dirName),
name);
} else {
wname = new Path(new Path(new Path(out, segmentName + "-" + slice),
dirName), name);
}
CompressionType compType = SequenceFileOutputFormat
.getOutputCompressionType(context);
if (clazz.isAssignableFrom(ParseText.class)) {
compType = CompressionType.RECORD;
}
Option rKeyClassOpt = MapFile.Writer.keyClass(Text.class);
org.apache.hadoop.io.SequenceFile.Writer.Option rValClassOpt = SequenceFile.Writer.valueClass(clazz);
org.apache.hadoop.io.SequenceFile.Writer.Option rProgressOpt = SequenceFile.Writer.progressable((Progressable)context);
org.apache.hadoop.io.SequenceFile.Writer.Option rCompOpt = SequenceFile.Writer.compression(compType);
res = new MapFile.Writer(conf, wname, rKeyClassOpt,
rValClassOpt, rCompOpt, rProgressOpt);
sliceWriters.put(slice + dirName, res);
return res;
}
@Override
public void close(TaskAttemptContext context) throws IOException {
Iterator<Closeable> it = sliceWriters.values().iterator();
while (it.hasNext()) {
Object o = it.next();
if (o instanceof SequenceFile.Writer) {
((SequenceFile.Writer) o).close();
} else {
((MapFile.Writer) o).close();
}
}
}
};
}