in src/java/org/apache/nutch/segment/SegmentReader.java [343:466]
public void get(final Path segment, final Text key, Writer writer,
final Map<String, List<Writable>> results) throws Exception {
LOG.info("SegmentReader: get '{}'", key);
ArrayList<Thread> threads = new ArrayList<>();
if (co && segmSubdirExists(getConf(), segment, Content.DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
try {
List<Writable> res = getMapRecords(new Path(segment,
Content.DIR_NAME), key);
results.put("co", res);
} catch (Exception e) {
LOG.error("Exception:", e);
}
}
});
if (fe && segmSubdirExists(getConf(), segment, CrawlDatum.FETCH_DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
try {
List<Writable> res = getMapRecords(new Path(segment,
CrawlDatum.FETCH_DIR_NAME), key);
results.put("fe", res);
} catch (Exception e) {
LOG.error("Exception:", e);
}
}
});
if (ge
&& segmSubdirExists(getConf(), segment, CrawlDatum.GENERATE_DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
try {
List<Writable> res = getSeqRecords(new Path(segment,
CrawlDatum.GENERATE_DIR_NAME), key);
results.put("ge", res);
} catch (Exception e) {
LOG.error("Exception:", e);
}
}
});
if (pa && segmSubdirExists(getConf(), segment, CrawlDatum.PARSE_DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
try {
List<Writable> res = getSeqRecords(new Path(segment,
CrawlDatum.PARSE_DIR_NAME), key);
results.put("pa", res);
} catch (Exception e) {
LOG.error("Exception:", e);
}
}
});
if (pd && segmSubdirExists(getConf(), segment, ParseData.DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
try {
List<Writable> res = getMapRecords(new Path(segment,
ParseData.DIR_NAME), key);
results.put("pd", res);
} catch (Exception e) {
LOG.error("Exception:", e);
}
}
});
if (pt && segmSubdirExists(getConf(), segment, ParseText.DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
try {
List<Writable> res = getMapRecords(new Path(segment,
ParseText.DIR_NAME), key);
results.put("pt", res);
} catch (Exception e) {
LOG.error("Exception:", e);
}
}
});
Iterator<Thread> it = threads.iterator();
if (!it.hasNext()) {
LOG.error("No segment subdirectories specified as input!");
return;
}
while (it.hasNext())
it.next().start();
int cnt;
do {
cnt = 0;
try {
Thread.sleep(5000);
} catch (Exception e) {
}
;
it = threads.iterator();
while (it.hasNext()) {
if (it.next().isAlive())
cnt++;
}
if ((cnt > 0) && (LOG.isDebugEnabled())) {
LOG.debug("(" + cnt + " to retrieve)");
}
} while (cnt > 0);
for (int i = 0; i < keys.length; i++) {
List<Writable> res = results.get(keys[i][0]);
if (res != null && res.size() > 0) {
for (int k = 0; k < res.size(); k++) {
writer.write(keys[i][1]);
if (recodeContent && keys[i][0].equals("co")) {
Charset charset = getCharset(((ParseData) results.get("pd").get(k)).getParseMeta());
writer.write(((Content) res.get(k)).toString(charset));
} else {
writer.write(res.get(k).toString());
}
writer.write('\n');
}
}
writer.flush();
}
}