public static void ingest()

in src/main/java/org/apache/accumulo/testing/ingest/TestIngest.java [197:331]


  public static void ingest(AccumuloClient client, FileSystem fs, Opts opts, Configuration conf)
      throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException,
      TableExistsException {
    long stopTime;

    byte[][] bytevals = generateValues(opts.dataSize);

    byte[] randomValue = new byte[opts.dataSize];
    Random random = new Random();

    long bytesWritten = 0;

    createTable(client, opts);

    BatchWriter bw = null;
    RFileWriter writer = null;

    if (opts.outputFile != null) {
      writer = RFile.newWriter().to(opts.outputFile + ".rf").withFileSystem(fs).build();
      writer.startDefaultLocalityGroup();

    } else {
      bw = client.createBatchWriter(opts.tableName);
      String principal = ClientProperty.AUTH_PRINCIPAL.getValue(opts.getClientProps());
      client.securityOperations().changeUserAuthorizations(principal, AUTHS);
    }
    Text labBA = new Text(opts.columnVisibility.getExpression());

    long startTime = System.currentTimeMillis();
    for (int i = 0; i < opts.rows; i++) {
      int rowid;
      if (opts.stride > 0) {
        rowid = ((i % opts.stride) * (opts.rows / opts.stride)) + (i / opts.stride);
      } else {
        rowid = i;
      }

      Text row = generateRow(rowid, opts.startRow);
      Mutation m = new Mutation(row);
      for (int j = 0; j < opts.cols; j++) {
        Text colf = new Text(opts.columnFamily);
        Text colq = new Text(FastFormat.toZeroPaddedString(j, 7, 10, COL_PREFIX));

        if (writer != null) {
          Key key = new Key(row, colf, colq, labBA);
          if (opts.timestamp >= 0) {
            key.setTimestamp(opts.timestamp);
          } else {
            key.setTimestamp(startTime);
          }

          key.setDeleted(opts.delete);

          bytesWritten += key.getSize();

          if (opts.delete) {
            writer.append(key, new Value(new byte[0]));
          } else {
            byte[] value;
            if (opts.random != null) {
              value = genRandomValue(random, randomValue, opts.random.intValue(),
                  rowid + opts.startRow, j);
            } else {
              value = bytevals[j % bytevals.length];
            }

            Value v = new Value(value);
            writer.append(key, v);
            bytesWritten += v.getSize();
          }

        } else {
          Key key = new Key(row, colf, colq, labBA);
          bytesWritten += key.getSize();

          if (opts.delete) {
            if (opts.timestamp >= 0)
              m.putDelete(colf, colq, opts.columnVisibility, opts.timestamp);
            else
              m.putDelete(colf, colq, opts.columnVisibility);
          } else {
            byte[] value;
            if (opts.random != null) {
              value = genRandomValue(random, randomValue, opts.random.intValue(),
                  rowid + opts.startRow, j);
            } else {
              value = bytevals[j % bytevals.length];
            }
            bytesWritten += value.length;

            if (opts.timestamp >= 0) {
              m.put(colf, colq, opts.columnVisibility, opts.timestamp, new Value(value, true));
            } else {
              m.put(colf, colq, opts.columnVisibility, new Value(value, true));
            }
          }
        }
      }
      if (bw != null)
        bw.addMutation(m);
    }

    if (writer != null) {
      writer.close();
    } else if (bw != null) {
      try {
        bw.close();
      } catch (MutationsRejectedException e) {
        if (e.getSecurityErrorCodes().size() > 0) {
          for (Entry<TabletId,Set<SecurityErrorCode>> entry : e.getSecurityErrorCodes()
              .entrySet()) {
            System.err.println("ERROR : Not authorized to write to : " + entry.getKey() + " due to "
                + entry.getValue());
          }
        }

        if (e.getConstraintViolationSummaries().size() > 0) {
          for (ConstraintViolationSummary cvs : e.getConstraintViolationSummaries()) {
            System.err.println("ERROR : Constraint violates : " + cvs);
          }
        }
        throw e;
      }
    }

    stopTime = System.currentTimeMillis();

    int totalValues = opts.rows * opts.cols;
    double elapsed = (stopTime - startTime) / 1000.0;

    System.out.printf(
        "%,12d records written | %,8d records/sec | %,12d bytes written | %,8d bytes/sec | %6.3f secs   %n",
        totalValues, (int) (totalValues / elapsed), bytesWritten, (int) (bytesWritten / elapsed),
        elapsed);
  }