public RecordWriter getRecordWriter()

in src/java/org/apache/nutch/segment/SegmentMerger.java [213:352]


    public RecordWriter<Text, MetaWrapper> getRecordWriter(TaskAttemptContext context)
            throws IOException {
      Configuration conf = context.getConfiguration();
      String name = getUniqueFile(context, "part", "");
      Path dir = FileOutputFormat.getOutputPath(context);
      FileSystem fs = dir.getFileSystem(context.getConfiguration());

      return new RecordWriter<Text, MetaWrapper>() {
        MapFile.Writer cOut = null;
        MapFile.Writer fOut = null;
        MapFile.Writer pdOut = null;
        MapFile.Writer ptOut = null;
        SequenceFile.Writer gOut = null;
        SequenceFile.Writer pOut = null;
        HashMap<String, Closeable> sliceWriters = new HashMap<>();
        String segmentName = conf.get("segment.merger.segmentName");

        @Override
        public void write(Text key, MetaWrapper wrapper) throws IOException {
          // unwrap
          SegmentPart sp = SegmentPart.parse(wrapper.getMeta(SEGMENT_PART_KEY));
          Writable o = wrapper.get();
          String slice = wrapper.getMeta(SEGMENT_SLICE_KEY);
          if (o instanceof CrawlDatum) {
            if (sp.partName.equals(CrawlDatum.GENERATE_DIR_NAME)) {
              gOut = ensureSequenceFile(slice, CrawlDatum.GENERATE_DIR_NAME);
              gOut.append(key, o);
            } else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) {
              fOut = ensureMapFile(slice, CrawlDatum.FETCH_DIR_NAME,
                      CrawlDatum.class);
              fOut.append(key, o);
            } else if (sp.partName.equals(CrawlDatum.PARSE_DIR_NAME)) {
              pOut = ensureSequenceFile(slice, CrawlDatum.PARSE_DIR_NAME);
              pOut.append(key, o);
            } else {
              throw new IOException("Cannot determine segment part: "
                      + sp.partName);
            }
          } else if (o instanceof Content) {
            cOut = ensureMapFile(slice, Content.DIR_NAME, Content.class);
            cOut.append(key, o);
          } else if (o instanceof ParseData) {
            // update the segment name inside contentMeta - required by Indexer
            if (slice == null) {
              ((ParseData) o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
                      segmentName);
            } else {
              ((ParseData) o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
                      segmentName + "-" + slice);
            }
            pdOut = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class);
            pdOut.append(key, o);
          } else if (o instanceof ParseText) {
            ptOut = ensureMapFile(slice, ParseText.DIR_NAME, ParseText.class);
            ptOut.append(key, o);
          }
        }

        // lazily create SequenceFile-s.
        private SequenceFile.Writer ensureSequenceFile(String slice,
                String dirName) throws IOException {
          if (slice == null)
            slice = DEFAULT_SLICE;
          SequenceFile.Writer res = (SequenceFile.Writer) sliceWriters
                  .get(slice + dirName);
          if (res != null)
            return res;
          Path wname;
          Path out = FileOutputFormat.getOutputPath(context);
          if (slice == DEFAULT_SLICE) {
            wname = new Path(new Path(new Path(out, segmentName), dirName),
                    name);
          } else {
            wname = new Path(new Path(new Path(out, segmentName + "-" + slice),
                    dirName), name);
          }

          res = SequenceFile.createWriter(conf, SequenceFile.Writer.file(wname),
                  SequenceFile.Writer.keyClass(Text.class),
                  SequenceFile.Writer.valueClass(CrawlDatum.class),
                  SequenceFile.Writer.bufferSize(fs.getConf().getInt("io.file.buffer.size",4096)),
                  SequenceFile.Writer.replication(fs.getDefaultReplication(wname)),
                  SequenceFile.Writer.blockSize(1073741824),
                  SequenceFile.Writer.compression(SequenceFileOutputFormat.getOutputCompressionType(context), new DefaultCodec()),
                  SequenceFile.Writer.progressable((Progressable)context),
                  SequenceFile.Writer.metadata(new Metadata())); 

          sliceWriters.put(slice + dirName, res);
          return res;
        }

        // lazily create MapFile-s.
        private MapFile.Writer ensureMapFile(String slice, String dirName,
                Class<? extends Writable> clazz) throws IOException {
          if (slice == null)
            slice = DEFAULT_SLICE;
          MapFile.Writer res = (MapFile.Writer) sliceWriters.get(slice
                  + dirName);
          if (res != null)
            return res;
          Path wname;
          Path out = FileOutputFormat.getOutputPath(context);
          if (slice == DEFAULT_SLICE) {
            wname = new Path(new Path(new Path(out, segmentName), dirName),
                    name);
          } else {
            wname = new Path(new Path(new Path(out, segmentName + "-" + slice),
                    dirName), name);
          }
          CompressionType compType = SequenceFileOutputFormat
                  .getOutputCompressionType(context);
          if (clazz.isAssignableFrom(ParseText.class)) {
            compType = CompressionType.RECORD;
          }

          Option rKeyClassOpt = MapFile.Writer.keyClass(Text.class);
          org.apache.hadoop.io.SequenceFile.Writer.Option rValClassOpt = SequenceFile.Writer.valueClass(clazz);
          org.apache.hadoop.io.SequenceFile.Writer.Option rProgressOpt = SequenceFile.Writer.progressable((Progressable)context);
          org.apache.hadoop.io.SequenceFile.Writer.Option rCompOpt = SequenceFile.Writer.compression(compType);

          res = new MapFile.Writer(conf, wname, rKeyClassOpt,
                  rValClassOpt, rCompOpt, rProgressOpt);
          sliceWriters.put(slice + dirName, res);
          return res;
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException {
          Iterator<Closeable> it = sliceWriters.values().iterator();
          while (it.hasNext()) {
            Object o = it.next();
            if (o instanceof SequenceFile.Writer) {
              ((SequenceFile.Writer) o).close();
            } else {
              ((MapFile.Writer) o).close();
            }
          }
        }
      };
    }