public void client()

in src/main/java/org/apache/accumulo/testing/scalability/Ingest.java [73:134]


  public void client() {

    AccumuloClient client = getClient();
    String tableName = getTestProperty("TABLE");

    // get batch writer configuration
    long maxMemory = Long.parseLong(getTestProperty("MAX_MEMORY"));
    long maxLatency = Long.parseLong(getTestProperty("MAX_LATENCY"));
    int maxWriteThreads = Integer.parseInt(getTestProperty("NUM_THREADS"));

    // create batch writer
    BatchWriter bw = null;
    try {
      bw = client.createBatchWriter(tableName, new BatchWriterConfig().setMaxMemory(maxMemory)
          .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
    } catch (TableNotFoundException e) {
      log.error("Table '" + tableName + "' not found.", e);
      System.exit(-1);
    }

    // configure writing
    Random r = new Random();
    String ingestInstanceId = UUID.randomUUID().toString();
    long numIngestEntries = Long.parseLong(getTestProperty("NUM_ENTRIES"));
    long minRow = 0L;
    long maxRow = 9223372036854775807L;
    int maxColF = 32767;
    int maxColQ = 32767;
    long count = 0;
    long totalBytes = 0;

    ColumnVisibility cv = new ColumnVisibility();

    // start timer
    startTimer();

    // write specified number of entries
    while (count < numIngestEntries) {
      count++;
      long rowId = ContinuousIngest.genLong(minRow, maxRow, r);
      Mutation m = ContinuousIngest.genMutation(rowId, r.nextInt(maxColF), r.nextInt(maxColQ), cv,
          ingestInstanceId.getBytes(UTF_8), count, null, false);
      totalBytes += m.numBytes();
      try {
        bw.addMutation(m);
      } catch (MutationsRejectedException e) {
        log.error("Mutations rejected.", e);
        System.exit(-1);
      }
    }

    // close writer
    try {
      bw.close();
    } catch (MutationsRejectedException e) {
      log.error("Could not close BatchWriter due to mutations being rejected.", e);
      System.exit(-1);
    }

    // stop timer
    stopTimer(count, totalBytes);
  }