in java/dataproc-wordcount/src/main/java/com/example/bigtable/sample/CellCounter.java [114:168]
public void map(ImmutableBytesWritable row, Result values,
Context context)
throws IOException {
Preconditions.checkState(values != null,
"values passed to the map is null");
String currentFamilyName = null;
String currentQualifierName = null;
String currentRowKey = null;
Configuration config = context.getConfiguration();
String separator = config.get("ReportSeparator",":");
try {
context.getCounter(Counters.ROWS).increment(1);
context.write(new Text("Total ROWS"), new IntWritable(1));
LOG.warn("Mapping, FTW!");
for (Cell value : values.listCells()) {
currentRowKey = Bytes.toStringBinary(CellUtil.cloneRow(value));
LOG.warn("Mapping, FTW! " + currentRowKey);
String thisRowFamilyName = Bytes.toStringBinary(CellUtil.cloneFamily(value));
if (!thisRowFamilyName.equals(currentFamilyName)) {
currentFamilyName = thisRowFamilyName;
context.getCounter("CF", thisRowFamilyName).increment(1);
if (1 == context.getCounter("CF", thisRowFamilyName).getValue()) {
context.write(new Text("Total Families Across all Rows"), new IntWritable(1));
context.write(new Text(thisRowFamilyName), new IntWritable(1));
}
}
String thisRowQualifierName = thisRowFamilyName + separator
+ Bytes.toStringBinary(CellUtil.cloneQualifier(value));
if (!thisRowQualifierName.equals(currentQualifierName)) {
currentQualifierName = thisRowQualifierName;
context.getCounter("CFQL", thisRowQualifierName).increment(1);
context.write(new Text("Total Qualifiers across all Rows"),
new IntWritable(1));
context.write(new Text(thisRowQualifierName), new IntWritable(1));
// Intialize versions
context.getCounter("QL_VERSIONS", currentRowKey + separator +
thisRowQualifierName).increment(1);
context.write(new Text(currentRowKey + separator
+ thisRowQualifierName + "_Versions"), new IntWritable(1));
} else {
// Increment versions
currentQualifierName = thisRowQualifierName;
context.getCounter("QL_VERSIONS", currentRowKey + separator +
thisRowQualifierName).increment(1);
context.write(new Text(currentRowKey + separator
+ thisRowQualifierName + "_Versions"), new IntWritable(1));
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}