public static void main()

in webindex/modules/data/src/main/java/webindex/data/LoadHdfs.java [53:117]


  public static void main(String[] args) throws Exception {

    if (args.length != 1) {
      log.error("Usage: LoadHdfs <dataDir>");
      System.exit(1);
    }
    final String dataDir = args[0];
    IndexEnv.validateDataDir(dataDir);

    final String hadoopConfDir = IndexEnv.getHadoopConfDir();
    final WebIndexConfig webIndexConfig = WebIndexConfig.load();
    final int rateLimit = webIndexConfig.getLoadRateLimit();
    final String appName = webIndexConfig.fluoApp;

    List<String> loadPaths = new ArrayList<>();
    FileSystem hdfs = IndexEnv.getHDFS();
    RemoteIterator<LocatedFileStatus> listIter = hdfs.listFiles(new Path(dataDir), true);
    while (listIter.hasNext()) {
      LocatedFileStatus status = listIter.next();
      if (status.isFile()) {
        loadPaths.add(status.getPath().toString());
      }
    }

    log.info("Loading {} files into Fluo from {}", loadPaths.size(), dataDir);

    SparkConf sparkConf = new SparkConf().setAppName("webindex-load-hdfs");
    try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) {

      JavaRDD<String> paths = ctx.parallelize(loadPaths, loadPaths.size());

      paths.foreachPartition(iter -> {
        final FluoConfiguration fluoConfig =
            new FluoConfiguration(new File("fluo-conn.properties"));
        fluoConfig.setApplicationName(appName);
        final RateLimiter rateLimiter = rateLimit > 0 ? RateLimiter.create(rateLimit) : null;
        FileSystem fs = IndexEnv.getHDFS(hadoopConfDir);
        try (FluoClient client = FluoFactory.newClient(fluoConfig);
            LoaderExecutor le = client.newLoaderExecutor()) {
          iter.forEachRemaining(path -> {
            Path filePath = new Path(path);
            try {
              if (fs.exists(filePath)) {
                FSDataInputStream fsin = fs.open(filePath);
                ArchiveReader reader = WARCReaderFactory.get(filePath.getName(), fsin, true);
                for (ArchiveRecord record : reader) {
                  Page page = ArchiveUtil.buildPageIgnoreErrors(record);
                  if (page.getOutboundLinks().size() > 0) {
                    log.info("Loading page {} with {} links", page.getUrl(),
                        page.getOutboundLinks().size());
                    if (rateLimiter != null) {
                      rateLimiter.acquire();
                    }
                    le.execute(PageLoader.updatePage(page));
                  }
                }
              }
            } catch (IOException e) {
              log.error("Exception while processing {}", path, e);
            }
          });
        }
      });
    }
  }