public RecordReader createRecordReader()

in src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java [103:199]


  public RecordReader<Key,Value> createRecordReader(InputSplit inputSplit,
      TaskAttemptContext taskAttemptContext) {
    return new RecordReader<Key,Value>() {
      long numNodes;
      long nodeCount;
      private Random random;

      private byte[] uuid;

      long minRow;
      long maxRow;
      int maxFam;
      int maxQual;
      List<ColumnVisibility> visibilities;
      boolean checksum;

      Key prevKey;
      Key currKey;
      Value currValue;

      @Override
      public void initialize(InputSplit inputSplit, TaskAttemptContext job) {
        numNodes = job.getConfiguration().getLong(PROP_MAP_NODES, 1000000);
        uuid = job.getConfiguration().get(PROP_UUID).getBytes(StandardCharsets.UTF_8);

        minRow = job.getConfiguration().getLong(PROP_ROW_MIN, 0);
        maxRow = job.getConfiguration().getLong(PROP_ROW_MAX, Long.MAX_VALUE);
        maxFam = job.getConfiguration().getInt(PROP_FAM_MAX, Short.MAX_VALUE);
        maxQual = job.getConfiguration().getInt(PROP_QUAL_MAX, Short.MAX_VALUE);
        checksum = job.getConfiguration().getBoolean(PROP_CHECKSUM, false);
        visibilities = ContinuousIngest.parseVisibilities(job.getConfiguration().get(PROP_VIS));

        random = new Random(new SecureRandom().nextLong());

        nodeCount = 0;
      }

      private Key genKey(CRC32 cksum) {

        byte[] row = genRow(genLong(minRow, maxRow, random));

        byte[] fam = genCol(random.nextInt(maxFam));
        byte[] qual = genCol(random.nextInt(maxQual));
        byte[] cv = visibilities.get(random.nextInt(visibilities.size())).flatten();

        if (cksum != null) {
          cksum.update(row);
          cksum.update(fam);
          cksum.update(qual);
          cksum.update(cv);
        }

        return new Key(row, fam, qual, cv);
      }

      private byte[] createValue(byte[] ingestInstanceId, byte[] prevRow, Checksum cksum) {
        return ContinuousIngest.createValue(ingestInstanceId, nodeCount, prevRow, cksum);
      }

      @Override
      public boolean nextKeyValue() {

        if (nodeCount < numNodes) {
          CRC32 cksum = checksum ? new CRC32() : null;
          prevKey = currKey;
          byte[] prevRow = prevKey != null ? prevKey.getRowData().toArray() : null;
          currKey = genKey(cksum);
          currValue = new Value(createValue(uuid, prevRow, cksum));

          nodeCount++;
          return true;
        } else {
          return false;
        }
      }

      @Override
      public Key getCurrentKey() {
        return currKey;
      }

      @Override
      public Value getCurrentValue() {
        return currValue;
      }

      @Override
      public float getProgress() {
        return nodeCount * 1.0f / numNodes;
      }

      @Override
      public void close() throws IOException {

      }
    };
  }