protected static void doIngest()

in src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java [281:409]


  protected static void doIngest(AccumuloClient client, RandomGeneratorFactory randomFactory,
      BatchWriterFactory batchWriterFactory, String tableName, Properties testProps, int maxColF,
      int maxColQ, long numEntries, boolean checksum, Random random)
      throws TableNotFoundException, MutationsRejectedException, InterruptedException {

    if (!client.tableOperations().exists(tableName)) {
      throw new TableNotFoundException(null, tableName,
          "Consult the README and create the table before starting ingest.");
    }

    byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
    log.info("Ingest instance ID: {} current time: {}ms", new String(ingestInstanceId, UTF_8),
        System.currentTimeMillis());

    long entriesWritten = 0L;
    long entriesDeleted = 0L;
    final int flushInterval = getFlushEntries(testProps);
    log.info("A flush will occur after every {} entries written", flushInterval);
    final int maxDepth = 25;

    // always want to point back to flushed data. This way the previous item should
    // always exist in accumulo when verifying data. To do this make insert N point
    // back to the row from insert (N - flushInterval). The array below is used to keep
    // track of all inserts.
    MutationInfo[][] nodeMap = new MutationInfo[maxDepth][flushInterval];

    long lastFlushTime = System.currentTimeMillis();

    log.info("Total entries to be written: {}", numEntries);

    visibilities = parseVisibilities(testProps.getProperty(TestProps.CI_INGEST_VISIBILITIES));

    pauseEnabled = pauseEnabled(testProps);

    pauseMin = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MIN));
    pauseMax = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MAX));
    Preconditions.checkState(0 < pauseMin && pauseMin <= pauseMax,
        "Bad pause wait min/max, must conform to: 0 < min <= max");

    if (pauseEnabled) {
      lastPauseNs = System.nanoTime();
      pauseWaitSec = getPause(random);
      log.info("PAUSING enabled");
      log.info("INGESTING for {}s", pauseWaitSec);
    }

    final float deleteProbability = getDeleteProbability(testProps);
    log.info("DELETES will occur with a probability of {}",
        String.format("%.02f", deleteProbability));

    try (BatchWriter bw = batchWriterFactory.create(tableName)) {
      zipfianEnabled =
          Boolean.parseBoolean(testProps.getProperty("test.ci.ingest.zipfian.enabled"));

      if (zipfianEnabled) {
        minSize = Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.min.size"));
        maxSize = Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.max.size"));
        exponent = Double.parseDouble(testProps.getProperty("test.ci.ingest.zipfian.exponent"));
        rnd = new RandomDataGenerator();

        log.info("Zipfian distribution enabled with min size: {}, max size: {}, exponent: {}",
            minSize, maxSize, exponent);
      }

      out: while (true) {
        ColumnVisibility cv = getVisibility(random);

        // generate sets nodes that link to previous set of nodes
        for (int depth = 0; depth < maxDepth; depth++) {
          // use the same random generator for each flush interval
          LongSupplier randomRowGenerator = randomFactory.get();
          for (int index = 0; index < flushInterval; index++) {
            long rowLong = randomRowGenerator.getAsLong();

            byte[] prevRow = depth == 0 ? null : genRow(nodeMap[depth - 1][index].row);

            int cfInt = random.nextInt(maxColF);
            int cqInt = random.nextInt(maxColQ);

            nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt);
            Mutation m = genMutation(rowLong, cfInt, cqInt, cv, ingestInstanceId, entriesWritten,
                prevRow, checksum);
            entriesWritten++;
            bw.addMutation(m);
          }

          lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
          if (entriesWritten >= numEntries)
            break out;
          pauseCheck(random);
        }

        // random chance that the entries will be deleted
        final boolean delete = random.nextFloat() < deleteProbability;

        // if the previously written entries are scheduled to be deleted
        if (delete) {
          log.info("Deleting last portion of written entries");
          // add delete mutations in the reverse order in which they were written
          for (int depth = nodeMap.length - 1; depth >= 0; depth--) {
            for (int index = nodeMap[depth].length - 1; index >= 0; index--) {
              MutationInfo currentNode = nodeMap[depth][index];
              Mutation m = new Mutation(genRow(currentNode.row));
              m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq));
              entriesDeleted++;
              bw.addMutation(m);
            }
            lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
            pauseCheck(random);
          }
        } else {
          // create one big linked list, this makes all the first inserts point to something
          for (int index = 0; index < flushInterval - 1; index++) {
            MutationInfo firstEntry = nodeMap[0][index];
            MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1];
            Mutation m = genMutation(firstEntry.row, firstEntry.cf, firstEntry.cq, cv,
                ingestInstanceId, entriesWritten, genRow(lastEntry.row), checksum);
            entriesWritten++;
            bw.addMutation(m);
          }
          lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
        }

        if (entriesWritten >= numEntries)
          break out;
        pauseCheck(random);
      }
    }
  }