public void get()

in src/java/org/apache/nutch/segment/SegmentReader.java [343:466]


  public void get(final Path segment, final Text key, Writer writer,
      final Map<String, List<Writable>> results) throws Exception {
    LOG.info("SegmentReader: get '{}'", key);
    ArrayList<Thread> threads = new ArrayList<>();
    if (co && segmSubdirExists(getConf(), segment, Content.DIR_NAME))
      threads.add(new Thread() {
        @Override
        public void run() {
          try {
            List<Writable> res = getMapRecords(new Path(segment,
                Content.DIR_NAME), key);
            results.put("co", res);
          } catch (Exception e) {
            LOG.error("Exception:", e);
          }
        }
      });
    if (fe && segmSubdirExists(getConf(), segment, CrawlDatum.FETCH_DIR_NAME))
      threads.add(new Thread() {
        @Override
        public void run() {
          try {
            List<Writable> res = getMapRecords(new Path(segment,
                CrawlDatum.FETCH_DIR_NAME), key);
            results.put("fe", res);
          } catch (Exception e) {
            LOG.error("Exception:", e);
          }
        }
      });
    if (ge
        && segmSubdirExists(getConf(), segment, CrawlDatum.GENERATE_DIR_NAME))
      threads.add(new Thread() {
        @Override
        public void run() {
          try {
            List<Writable> res = getSeqRecords(new Path(segment,
                CrawlDatum.GENERATE_DIR_NAME), key);
            results.put("ge", res);
          } catch (Exception e) {
            LOG.error("Exception:", e);
          }
        }
      });
    if (pa && segmSubdirExists(getConf(), segment, CrawlDatum.PARSE_DIR_NAME))
      threads.add(new Thread() {
        @Override
        public void run() {
          try {
            List<Writable> res = getSeqRecords(new Path(segment,
                CrawlDatum.PARSE_DIR_NAME), key);
            results.put("pa", res);
          } catch (Exception e) {
            LOG.error("Exception:", e);
          }
        }
      });
    if (pd && segmSubdirExists(getConf(), segment, ParseData.DIR_NAME))
      threads.add(new Thread() {
        @Override
        public void run() {
          try {
            List<Writable> res = getMapRecords(new Path(segment,
                ParseData.DIR_NAME), key);
            results.put("pd", res);
          } catch (Exception e) {
            LOG.error("Exception:", e);
          }
        }
      });
    if (pt && segmSubdirExists(getConf(), segment, ParseText.DIR_NAME))
      threads.add(new Thread() {
        @Override
        public void run() {
          try {
            List<Writable> res = getMapRecords(new Path(segment,
                ParseText.DIR_NAME), key);
            results.put("pt", res);
          } catch (Exception e) {
            LOG.error("Exception:", e);
          }
        }
      });
    Iterator<Thread> it = threads.iterator();
    if (!it.hasNext()) {
      LOG.error("No segment subdirectories specified as input!");
      return;
    }
    while (it.hasNext())
      it.next().start();
    int cnt;
    do {
      cnt = 0;
      try {
        Thread.sleep(5000);
      } catch (Exception e) {
      }
      ;
      it = threads.iterator();
      while (it.hasNext()) {
        if (it.next().isAlive())
          cnt++;
      }
      if ((cnt > 0) && (LOG.isDebugEnabled())) {
        LOG.debug("(" + cnt + " to retrieve)");
      }
    } while (cnt > 0);
    for (int i = 0; i < keys.length; i++) {
      List<Writable> res = results.get(keys[i][0]);
      if (res != null && res.size() > 0) {
        for (int k = 0; k < res.size(); k++) {
          writer.write(keys[i][1]);
          if (recodeContent && keys[i][0].equals("co")) {
            Charset charset = getCharset(((ParseData) results.get("pd").get(k)).getParseMeta());
            writer.write(((Content) res.get(k)).toString(charset));
          } else {
            writer.write(res.get(k).toString());
          }
          writer.write('\n');
        }
      }
      writer.flush();
    }
  }