in spark/src/main/java/org/apache/accumulo/spark/CopyPlus5K.java [114:191]
public static void main(String[] args) throws Exception {
if ((!args[0].equals("batch") && !args[0].equals("bulk")) || args[1].isEmpty()) {
System.out.println("Usage: ./run.sh [batch|bulk] /path/to/accumulo-client.properties");
System.exit(1);
}
// Read client properties from file
final Properties props = Accumulo.newClientProperties().from(args[1]).build();
cleanupAndCreateTables(props);
SparkConf conf = new SparkConf();
conf.setAppName("CopyPlus5K");
// KryoSerializer is needed for serializing Accumulo Key when partitioning data for bulk import
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class[] {Key.class, Value.class, Properties.class});
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
Job job = Job.getInstance();
// Read input from Accumulo
AccumuloInputFormat.configure().clientProperties(props).table(inputTable).store(job);
JavaPairRDD<Key,Value> data = sc.newAPIHadoopRDD(job.getConfiguration(),
AccumuloInputFormat.class, Key.class, Value.class);
// Add 5K to all values
JavaPairRDD<Key,Value> dataPlus5K = data
.mapValues(v -> new Value("" + (Integer.parseInt(v.toString()) + 5_000)));
if (args[0].equals("batch")) {
// Write output using batch writer
dataPlus5K.foreachPartition(iter -> {
// Intentionally created an Accumulo client for each partition to avoid attempting to
// serialize it and send it to each remote process.
try (AccumuloClient client = Accumulo.newClient().from(props).build();
BatchWriter bw = client.createBatchWriter(outputTable)) {
iter.forEachRemaining(kv -> {
Key key = kv._1;
Value val = kv._2;
Mutation m = new Mutation(key.getRow());
m.at().family(key.getColumnFamily()).qualifier(key.getColumnQualifier())
.visibility(key.getColumnVisibility()).timestamp(key.getTimestamp()).put(val);
try {
bw.addMutation(m);
} catch (MutationsRejectedException e) {
e.printStackTrace();
}
});
}
});
} else if (args[0].equals("bulk")) {
// Write output using bulk import
// Create HDFS directory for bulk import
FileSystem hdfs = FileSystem.get(new Configuration());
hdfs.mkdirs(rootPath);
Path outputDir = new Path(rootPath.toString() + "/output");
// Write Spark output to HDFS
AccumuloFileOutputFormat.configure().outputPath(outputDir).store(job);
Partitioner partitioner = new AccumuloRangePartitioner("3", "7");
JavaPairRDD<Key,Value> partData = dataPlus5K
.repartitionAndSortWithinPartitions(partitioner);
partData.saveAsNewAPIHadoopFile(outputDir.toString(), Key.class, Value.class,
AccumuloFileOutputFormat.class);
// Bulk import into Accumulo
try (AccumuloClient client = Accumulo.newClient().from(props).build()) {
client.tableOperations().importDirectory(outputDir.toString()).to(outputTable).load();
}
} else {
System.out.println("Unknown method to write output: " + args[0]);
System.exit(1);
}
}
}