protected void cleanup()

in community/mahout-mr/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java [156:303]


    protected void cleanup(Context context) throws IOException,
      InterruptedException {
      try {

        yiCols = new double[kp][];

        for (int i = 0; i < kp; i++) {
          yiCols[i] = new double[Math.min(aRowCount, blockHeight)];
        }

        int numPasses = (aRowCount - 1) / blockHeight + 1;

        String propBtPathStr = context.getConfiguration().get(PROP_BT_PATH);
        Validate.notNull(propBtPathStr, "Bt input is not set");
        Path btPath = new Path(propBtPathStr);
        DenseBlockWritable dbw = new DenseBlockWritable();

        /*
         * so it turns out that it may be much more efficient to do a few
         * independent passes over Bt accumulating the entire block in memory
         * than pass huge amount of blocks out to combiner. so we aim of course
         * to fit entire s x (k+p) dense block in memory where s is the number
         * of A rows in this split. If A is much sparser than (k+p) avg # of
         * elements per row then the block may exceed the split size. if this
         * happens, and if the given blockHeight is not high enough to
         * accomodate this (because of memory constraints), then we start
         * splitting s into several passes. since computation is cpu-bound
         * anyway, it should be o.k. for supersparse inputs. (as ok it can be
         * that projection is thicker than the original anyway, why would one
         * use that many k+p then).
         */
        int lastRowIndex = -1;
        for (int pass = 0; pass < numPasses; pass++) {

          if (distributedBt) {

            btInput =
              new SequenceFileDirIterator<>(btLocalPath, true, localFsConfig);

          } else {

            btInput =
              new SequenceFileDirIterator<>(btPath, PathType.GLOB, null, null, true, context.getConfiguration());
          }
          closeables.addFirst(btInput);
          Validate.isTrue(btInput.hasNext(), "Empty B' input!");

          int aRowBegin = pass * blockHeight;
          int bh = Math.min(blockHeight, aRowCount - aRowBegin);

          /*
           * check if we need to trim block allocation
           */
          if (pass > 0) {
            if (bh == blockHeight) {
              for (int i = 0; i < kp; i++) {
                Arrays.fill(yiCols[i], 0.0);
              }
            } else {

              for (int i = 0; i < kp; i++) {
                yiCols[i] = null;
              }
              for (int i = 0; i < kp; i++) {
                yiCols[i] = new double[bh];
              }
            }
          }

          while (btInput.hasNext()) {
            Pair<IntWritable, VectorWritable> btRec = btInput.next();
            int btIndex = btRec.getFirst().get();
            Vector btVec = btRec.getSecond().get();
            Vector aCol;
            if (btIndex > aCols.length || (aCol = aCols[btIndex]) == null
                || aCol.size() == 0) {

              /* 100% zero A column in the block, skip it as sparse */
              continue;
            }
            int j = -1;
            for (Vector.Element aEl : aCol.nonZeroes()) {
              j = aEl.index();

              /*
               * now we compute only swathes between aRowBegin..aRowBegin+bh
               * exclusive. it seems like a deficiency but in fact i think it
               * will balance itself out: either A is dense and then we
               * shouldn't have more than one pass and therefore filter
               * conditions will never kick in. Or, the only situation where we
               * can't fit Y_i block in memory is when A input is much sparser
               * than k+p per row. But if this is the case, then we'd be looking
               * at very few elements without engaging them in any operations so
               * even then it should be ok.
               */
              if (j < aRowBegin) {
                continue;
              }
              if (j >= aRowBegin + bh) {
                break;
              }

              /*
               * assume btVec is dense
               */
              if (xi != null) {
                /*
                 * MAHOUT-817: PCA correction for B'. I rewrite the whole
                 * computation loop so i don't have to check if PCA correction
                 * is needed at individual element level. It looks bulkier this
                 * way but perhaps less wasteful on cpu.
                 */
                for (int s = 0; s < kp; s++) {
                  // code defensively against shortened xi
                  double xii = xi.size() > btIndex ? xi.get(btIndex) : 0.0;
                  yiCols[s][j - aRowBegin] +=
                    aEl.get() * (btVec.getQuick(s) - xii * sq.get(s));
                }
              } else {
                /*
                 * no PCA correction
                 */
                for (int s = 0; s < kp; s++) {
                  yiCols[s][j - aRowBegin] += aEl.get() * btVec.getQuick(s);
                }
              }

            }
            if (lastRowIndex < j) {
              lastRowIndex = j;
            }
          }

          /*
           * so now we have stuff in yi
           */
          dbw.setBlock(yiCols);
          outKey.setTaskItemOrdinal(pass);
          context.write(outKey, dbw);

          closeables.remove(btInput);
          btInput.close();
        }

      } finally {
        IOUtils.close(closeables);
      }
    }