public RecordReader createRecordReader()

in modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java [56:131]


  public RecordReader<RowColumn, Bytes> createRecordReader(InputSplit split,
      TaskAttemptContext context) throws IOException, InterruptedException {

    return new RecordReader<RowColumn, Bytes>() {

      private RowColumnValue rowColVal;
      private Environment env = null;
      private TransactionImpl ti = null;
      private Iterator<RowColumnValue> cellIterator;

      @Override
      public void close() throws IOException {
        if (ti != null) {
          ti.close();
        }

        if (env != null) {
          env.close();
        }
      }

      @Override
      public RowColumn getCurrentKey() throws IOException, InterruptedException {
        return rowColVal.getRowColumn();
      }

      @Override
      public Bytes getCurrentValue() throws IOException, InterruptedException {
        return rowColVal.getValue();
      }

      @Override
      public float getProgress() throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        return 0;
      }

      @Override
      public void initialize(InputSplit split, TaskAttemptContext context)
          throws IOException, InterruptedException {
        try {
          ByteArrayInputStream bais = new ByteArrayInputStream(
              context.getConfiguration().get(PROPS_CONF_KEY).getBytes(StandardCharsets.UTF_8));

          env = new Environment(new FluoConfiguration(bais));

          ti = new TransactionImpl(env, context.getConfiguration().getLong(TIMESTAMP_CONF_KEY, -1));

          // TODO this uses non public Accumulo API!
          RangeInputSplit ris = (RangeInputSplit) split;
          Span span = SpanUtil.toSpan(ris.getRange());

          HashSet<Column> columns = new HashSet<>();

          for (String fam : context.getConfiguration().getStrings(FAMS_CONF_KEY, new String[0])) {
            columns.add(new Column(fam));
          }

          cellIterator = ti.scanner().over(span).fetch(columns).build().iterator();
        } catch (Exception e) {
          throw new IOException(e);
        }
      }

      @Override
      public boolean nextKeyValue() throws IOException, InterruptedException {
        if (cellIterator.hasNext()) {
          rowColVal = cellIterator.next();
          return true;
        } else {
          rowColVal = null;
          return false;
        }
      }
    };
  }