public void map()

in java/dataproc-wordcount/src/main/java/com/example/bigtable/sample/CellCounter.java [114:168]


    public void map(ImmutableBytesWritable row, Result values,
                    Context context)
        throws IOException {
      Preconditions.checkState(values != null,
          "values passed to the map is null");
      String currentFamilyName = null;
      String currentQualifierName = null;
      String currentRowKey = null;
      Configuration config = context.getConfiguration();
      String separator = config.get("ReportSeparator",":");
      try {
        context.getCounter(Counters.ROWS).increment(1);
        context.write(new Text("Total ROWS"), new IntWritable(1));

        LOG.warn("Mapping, FTW!");

        for (Cell value : values.listCells()) {
          currentRowKey = Bytes.toStringBinary(CellUtil.cloneRow(value));
          LOG.warn("Mapping, FTW! " + currentRowKey);
          String thisRowFamilyName = Bytes.toStringBinary(CellUtil.cloneFamily(value));
          if (!thisRowFamilyName.equals(currentFamilyName)) {
            currentFamilyName = thisRowFamilyName;
            context.getCounter("CF", thisRowFamilyName).increment(1);
            if (1 == context.getCounter("CF", thisRowFamilyName).getValue()) {
              context.write(new Text("Total Families Across all Rows"), new IntWritable(1));
              context.write(new Text(thisRowFamilyName), new IntWritable(1));
            }
          }
          String thisRowQualifierName = thisRowFamilyName + separator
              + Bytes.toStringBinary(CellUtil.cloneQualifier(value));
          if (!thisRowQualifierName.equals(currentQualifierName)) {
            currentQualifierName = thisRowQualifierName;
            context.getCounter("CFQL", thisRowQualifierName).increment(1);
            context.write(new Text("Total Qualifiers across all Rows"),
              new IntWritable(1));
            context.write(new Text(thisRowQualifierName), new IntWritable(1));
            // Intialize versions
            context.getCounter("QL_VERSIONS", currentRowKey + separator +
              thisRowQualifierName).increment(1);
            context.write(new Text(currentRowKey + separator
                + thisRowQualifierName + "_Versions"), new IntWritable(1));

          } else {
            // Increment versions
            currentQualifierName = thisRowQualifierName;
            context.getCounter("QL_VERSIONS", currentRowKey + separator +
              thisRowQualifierName).increment(1);
            context.write(new Text(currentRowKey + separator
                + thisRowQualifierName + "_Versions"), new IntWritable(1));
          }
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }