in webindex/modules/data/src/main/java/webindex/data/LoadHdfs.java [53:117]
public static void main(String[] args) throws Exception {
if (args.length != 1) {
log.error("Usage: LoadHdfs <dataDir>");
System.exit(1);
}
final String dataDir = args[0];
IndexEnv.validateDataDir(dataDir);
final String hadoopConfDir = IndexEnv.getHadoopConfDir();
final WebIndexConfig webIndexConfig = WebIndexConfig.load();
final int rateLimit = webIndexConfig.getLoadRateLimit();
final String appName = webIndexConfig.fluoApp;
List<String> loadPaths = new ArrayList<>();
FileSystem hdfs = IndexEnv.getHDFS();
RemoteIterator<LocatedFileStatus> listIter = hdfs.listFiles(new Path(dataDir), true);
while (listIter.hasNext()) {
LocatedFileStatus status = listIter.next();
if (status.isFile()) {
loadPaths.add(status.getPath().toString());
}
}
log.info("Loading {} files into Fluo from {}", loadPaths.size(), dataDir);
SparkConf sparkConf = new SparkConf().setAppName("webindex-load-hdfs");
try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) {
JavaRDD<String> paths = ctx.parallelize(loadPaths, loadPaths.size());
paths.foreachPartition(iter -> {
final FluoConfiguration fluoConfig =
new FluoConfiguration(new File("fluo-conn.properties"));
fluoConfig.setApplicationName(appName);
final RateLimiter rateLimiter = rateLimit > 0 ? RateLimiter.create(rateLimit) : null;
FileSystem fs = IndexEnv.getHDFS(hadoopConfDir);
try (FluoClient client = FluoFactory.newClient(fluoConfig);
LoaderExecutor le = client.newLoaderExecutor()) {
iter.forEachRemaining(path -> {
Path filePath = new Path(path);
try {
if (fs.exists(filePath)) {
FSDataInputStream fsin = fs.open(filePath);
ArchiveReader reader = WARCReaderFactory.get(filePath.getName(), fsin, true);
for (ArchiveRecord record : reader) {
Page page = ArchiveUtil.buildPageIgnoreErrors(record);
if (page.getOutboundLinks().size() > 0) {
log.info("Loading page {} with {} links", page.getUrl(),
page.getOutboundLinks().size());
if (rateLimiter != null) {
rateLimiter.acquire();
}
le.execute(PageLoader.updatePage(page));
}
}
}
} catch (IOException e) {
log.error("Exception while processing {}", path, e);
}
});
}
});
}
}