in modules/spark/src/main/java/org/apache/fluo/recipes/spark/FluoSparkHelper.java [199:244]
public void bulkImportKvToAccumulo(JavaPairRDD<Key, Value> data, String accumuloTable,
BulkImportOptions opts) {
Path tempDir = getTempDir(opts);
try (AccumuloClient client = getAccumuloClient(fluoConfig)) {
if (hdfs.exists(tempDir)) {
throw new IllegalArgumentException("HDFS temp dir already exists: " + tempDir.toString());
}
hdfs.mkdirs(tempDir);
Path dataDir = new Path(tempDir.toString() + "/data");
Path failDir = new Path(tempDir.toString() + "/fail");
hdfs.mkdirs(failDir);
// save data to HDFS
Job job = Job.getInstance(hadoopConfig);
AccumuloFileOutputFormat.setOutputPath(job, dataDir);
// must use new API here as saveAsHadoopFile throws exception
data.saveAsNewAPIHadoopFile(dataDir.toString(), Key.class, Value.class,
AccumuloFileOutputFormat.class, job.getConfiguration());
// bulk import data to Accumulo
log.info("Wrote data for bulk import to HDFS temp directory: {}", dataDir);
Connector conn = chooseConnector(client, opts);
conn.tableOperations().importDirectory(accumuloTable, dataDir.toString(), failDir.toString(),
false);
// throw exception if failures directory contains files
if (hdfs.listFiles(failDir, true).hasNext()) {
throw new IllegalStateException("Bulk import failed! Found files that failed to import "
+ "in failures directory: " + failDir);
}
log.info("Successfully bulk imported data in {} to '{}' Accumulo table", dataDir,
accumuloTable);
// delete data directory
hdfs.delete(tempDir, true);
log.info("Deleted HDFS temp directory created for bulk import: {}", tempDir);
// @formatter:off
} catch (IOException | TableNotFoundException | AccumuloException
| AccumuloSecurityException e) {
// @formatter:on
throw new IllegalStateException(e);
}
}