in modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java [56:131]
public RecordReader<RowColumn, Bytes> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new RecordReader<RowColumn, Bytes>() {
private RowColumnValue rowColVal;
private Environment env = null;
private TransactionImpl ti = null;
private Iterator<RowColumnValue> cellIterator;
@Override
public void close() throws IOException {
if (ti != null) {
ti.close();
}
if (env != null) {
env.close();
}
}
@Override
public RowColumn getCurrentKey() throws IOException, InterruptedException {
return rowColVal.getRowColumn();
}
@Override
public Bytes getCurrentValue() throws IOException, InterruptedException {
return rowColVal.getValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return 0;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
try {
ByteArrayInputStream bais = new ByteArrayInputStream(
context.getConfiguration().get(PROPS_CONF_KEY).getBytes(StandardCharsets.UTF_8));
env = new Environment(new FluoConfiguration(bais));
ti = new TransactionImpl(env, context.getConfiguration().getLong(TIMESTAMP_CONF_KEY, -1));
// TODO this uses non public Accumulo API!
RangeInputSplit ris = (RangeInputSplit) split;
Span span = SpanUtil.toSpan(ris.getRange());
HashSet<Column> columns = new HashSet<>();
for (String fam : context.getConfiguration().getStrings(FAMS_CONF_KEY, new String[0])) {
columns.add(new Column(fam));
}
cellIterator = ti.scanner().over(span).fetch(columns).build().iterator();
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (cellIterator.hasNext()) {
rowColVal = cellIterator.next();
return true;
} else {
rowColVal = null;
return false;
}
}
};
}