public static void main()

in spark/src/main/java/org/apache/accumulo/spark/CopyPlus5K.java [114:191]


  public static void main(String[] args) throws Exception {

    if ((!args[0].equals("batch") && !args[0].equals("bulk")) || args[1].isEmpty()) {
      System.out.println("Usage: ./run.sh [batch|bulk] /path/to/accumulo-client.properties");
      System.exit(1);
    }

    // Read client properties from file
    final Properties props = Accumulo.newClientProperties().from(args[1]).build();

    cleanupAndCreateTables(props);

    SparkConf conf = new SparkConf();
    conf.setAppName("CopyPlus5K");
    // KryoSerializer is needed for serializing Accumulo Key when partitioning data for bulk import
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    conf.registerKryoClasses(new Class[] {Key.class, Value.class, Properties.class});

    try (JavaSparkContext sc = new JavaSparkContext(conf)) {

      Job job = Job.getInstance();

      // Read input from Accumulo
      AccumuloInputFormat.configure().clientProperties(props).table(inputTable).store(job);
      JavaPairRDD<Key,Value> data = sc.newAPIHadoopRDD(job.getConfiguration(),
          AccumuloInputFormat.class, Key.class, Value.class);

      // Add 5K to all values
      JavaPairRDD<Key,Value> dataPlus5K = data
          .mapValues(v -> new Value("" + (Integer.parseInt(v.toString()) + 5_000)));

      if (args[0].equals("batch")) {
        // Write output using batch writer
        dataPlus5K.foreachPartition(iter -> {
          // Intentionally created an Accumulo client for each partition to avoid attempting to
          // serialize it and send it to each remote process.
          try (AccumuloClient client = Accumulo.newClient().from(props).build();
              BatchWriter bw = client.createBatchWriter(outputTable)) {
            iter.forEachRemaining(kv -> {
              Key key = kv._1;
              Value val = kv._2;
              Mutation m = new Mutation(key.getRow());
              m.at().family(key.getColumnFamily()).qualifier(key.getColumnQualifier())
                  .visibility(key.getColumnVisibility()).timestamp(key.getTimestamp()).put(val);
              try {
                bw.addMutation(m);
              } catch (MutationsRejectedException e) {
                e.printStackTrace();
              }
            });
          }
        });
      } else if (args[0].equals("bulk")) {
        // Write output using bulk import

        // Create HDFS directory for bulk import
        FileSystem hdfs = FileSystem.get(new Configuration());
        hdfs.mkdirs(rootPath);
        Path outputDir = new Path(rootPath.toString() + "/output");

        // Write Spark output to HDFS
        AccumuloFileOutputFormat.configure().outputPath(outputDir).store(job);
        Partitioner partitioner = new AccumuloRangePartitioner("3", "7");
        JavaPairRDD<Key,Value> partData = dataPlus5K
            .repartitionAndSortWithinPartitions(partitioner);
        partData.saveAsNewAPIHadoopFile(outputDir.toString(), Key.class, Value.class,
            AccumuloFileOutputFormat.class);

        // Bulk import into Accumulo
        try (AccumuloClient client = Accumulo.newClient().from(props).build()) {
          client.tableOperations().importDirectory(outputDir.toString()).to(outputTable).load();
        }
      } else {
        System.out.println("Unknown method to write output: " + args[0]);
        System.exit(1);
      }
    }
  }