public static void main()

in src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java [101:230]


  public static void main(String[] args) throws Exception {

    try (ContinuousEnv env = new ContinuousEnv(args)) {

      AccumuloClient client = env.getAccumuloClient();

      final long rowMin = env.getRowMin();
      final long rowMax = env.getRowMax();
      Preconditions.checkState(0 <= rowMin && rowMin <= rowMax,
          "Bad rowMin/rowMax, must conform to: 0 <= rowMin <= rowMax");

      String tableName = env.getAccumuloTableName();
      if (!client.tableOperations().exists(tableName)) {
        throw new TableNotFoundException(null, tableName,
            "Consult the README and create the table before starting ingest.");
      }

      byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
      log.info("Ingest instance ID: {} current time: {}ms", new String(ingestInstanceId, UTF_8),
          System.currentTimeMillis());

      Properties testProps = env.getTestProperties();

      long entriesWritten = 0L;
      long entriesDeleted = 0L;
      final int flushInterval = getFlushEntries(testProps);
      log.info("A flush will occur after every {} entries written", flushInterval);
      final int maxDepth = 25;

      // always want to point back to flushed data. This way the previous item should
      // always exist in accumulo when verifying data. To do this make insert N point
      // back to the row from insert (N - flushInterval). The array below is used to keep
      // track of all inserts.
      MutationInfo[][] nodeMap = new MutationInfo[maxDepth][flushInterval];

      long lastFlushTime = System.currentTimeMillis();

      final int maxColF = env.getMaxColF();
      final int maxColQ = env.getMaxColQ();
      final boolean checksum = Boolean
          .parseBoolean(testProps.getProperty(TestProps.CI_INGEST_CHECKSUM));
      final long numEntries = Long
          .parseLong(testProps.getProperty(TestProps.CI_INGEST_CLIENT_ENTRIES));
      log.info("Total entries to be written: {}", numEntries);

      visibilities = parseVisibilities(testProps.getProperty(TestProps.CI_INGEST_VISIBILITIES));

      pauseEnabled = pauseEnabled(testProps);

      pauseMin = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MIN));
      pauseMax = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MAX));
      Preconditions.checkState(0 < pauseMin && pauseMin <= pauseMax,
          "Bad pause wait min/max, must conform to: 0 < min <= max");

      if (pauseEnabled) {
        lastPauseNs = System.nanoTime();
        pauseWaitSec = getPause(env.getRandom());
        log.info("PAUSING enabled");
        log.info("INGESTING for {}s", pauseWaitSec);
      }

      final float deleteProbability = getDeleteProbability(testProps);
      log.info("DELETES will occur with a probability of {}",
          String.format("%.02f", deleteProbability));

      try (BatchWriter bw = client.createBatchWriter(tableName)) {
        out: while (true) {
          ColumnVisibility cv = getVisibility(env.getRandom());

          // generate sets nodes that link to previous set of nodes
          for (int depth = 0; depth < maxDepth; depth++) {
            for (int index = 0; index < flushInterval; index++) {
              long rowLong = genLong(rowMin, rowMax, env.getRandom());

              byte[] prevRow = depth == 0 ? null : genRow(nodeMap[depth - 1][index].row);

              int cfInt = env.getRandom().nextInt(maxColF);
              int cqInt = env.getRandom().nextInt(maxColQ);

              nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt);
              Mutation m = genMutation(rowLong, cfInt, cqInt, cv, ingestInstanceId, entriesWritten,
                  prevRow, checksum);
              entriesWritten++;
              bw.addMutation(m);
            }

            lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
            if (entriesWritten >= numEntries)
              break out;
            pauseCheck(env.getRandom());
          }

          // random chance that the entries will be deleted
          final boolean delete = env.getRandom().nextFloat() < deleteProbability;

          // if the previously written entries are scheduled to be deleted
          if (delete) {
            log.info("Deleting last portion of written entries");
            // add delete mutations in the reverse order in which they were written
            for (int depth = nodeMap.length - 1; depth >= 0; depth--) {
              for (int index = nodeMap[depth].length - 1; index >= 0; index--) {
                MutationInfo currentNode = nodeMap[depth][index];
                Mutation m = new Mutation(genRow(currentNode.row));
                m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq));
                entriesDeleted++;
                bw.addMutation(m);
              }
              lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
              pauseCheck(env.getRandom());
            }
          } else {
            // create one big linked list, this makes all the first inserts point to something
            for (int index = 0; index < flushInterval - 1; index++) {
              MutationInfo firstEntry = nodeMap[0][index];
              MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1];
              Mutation m = genMutation(firstEntry.row, firstEntry.cf, firstEntry.cq, cv,
                  ingestInstanceId, entriesWritten, genRow(lastEntry.row), checksum);
              entriesWritten++;
              bw.addMutation(m);
            }
            lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
          }

          if (entriesWritten >= numEntries)
            break out;
          pauseCheck(env.getRandom());
        }
      }
    }
  }