public void bulkImportKvToAccumulo()

in modules/spark/src/main/java/org/apache/fluo/recipes/spark/FluoSparkHelper.java [199:244]


  public void bulkImportKvToAccumulo(JavaPairRDD<Key, Value> data, String accumuloTable,
      BulkImportOptions opts) {

    Path tempDir = getTempDir(opts);


    try (AccumuloClient client = getAccumuloClient(fluoConfig)) {
      if (hdfs.exists(tempDir)) {
        throw new IllegalArgumentException("HDFS temp dir already exists: " + tempDir.toString());
      }
      hdfs.mkdirs(tempDir);
      Path dataDir = new Path(tempDir.toString() + "/data");
      Path failDir = new Path(tempDir.toString() + "/fail");
      hdfs.mkdirs(failDir);

      // save data to HDFS
      Job job = Job.getInstance(hadoopConfig);
      AccumuloFileOutputFormat.setOutputPath(job, dataDir);
      // must use new API here as saveAsHadoopFile throws exception
      data.saveAsNewAPIHadoopFile(dataDir.toString(), Key.class, Value.class,
          AccumuloFileOutputFormat.class, job.getConfiguration());

      // bulk import data to Accumulo
      log.info("Wrote data for bulk import to HDFS temp directory: {}", dataDir);
      Connector conn = chooseConnector(client, opts);
      conn.tableOperations().importDirectory(accumuloTable, dataDir.toString(), failDir.toString(),
          false);

      // throw exception if failures directory contains files
      if (hdfs.listFiles(failDir, true).hasNext()) {
        throw new IllegalStateException("Bulk import failed!  Found files that failed to import "
            + "in failures directory: " + failDir);
      }
      log.info("Successfully bulk imported data in {} to '{}' Accumulo table", dataDir,
          accumuloTable);

      // delete data directory
      hdfs.delete(tempDir, true);
      log.info("Deleted HDFS temp directory created for bulk import: {}", tempDir);
      // @formatter:off
    } catch (IOException | TableNotFoundException | AccumuloException
        | AccumuloSecurityException e) {
      // @formatter:on
      throw new IllegalStateException(e);
    }
  }