in community/mahout-mr/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java [156:303]
protected void cleanup(Context context) throws IOException,
InterruptedException {
try {
yiCols = new double[kp][];
for (int i = 0; i < kp; i++) {
yiCols[i] = new double[Math.min(aRowCount, blockHeight)];
}
int numPasses = (aRowCount - 1) / blockHeight + 1;
String propBtPathStr = context.getConfiguration().get(PROP_BT_PATH);
Validate.notNull(propBtPathStr, "Bt input is not set");
Path btPath = new Path(propBtPathStr);
DenseBlockWritable dbw = new DenseBlockWritable();
/*
* so it turns out that it may be much more efficient to do a few
* independent passes over Bt accumulating the entire block in memory
* than pass huge amount of blocks out to combiner. so we aim of course
* to fit entire s x (k+p) dense block in memory where s is the number
* of A rows in this split. If A is much sparser than (k+p) avg # of
* elements per row then the block may exceed the split size. if this
* happens, and if the given blockHeight is not high enough to
* accomodate this (because of memory constraints), then we start
* splitting s into several passes. since computation is cpu-bound
* anyway, it should be o.k. for supersparse inputs. (as ok it can be
* that projection is thicker than the original anyway, why would one
* use that many k+p then).
*/
int lastRowIndex = -1;
for (int pass = 0; pass < numPasses; pass++) {
if (distributedBt) {
btInput =
new SequenceFileDirIterator<>(btLocalPath, true, localFsConfig);
} else {
btInput =
new SequenceFileDirIterator<>(btPath, PathType.GLOB, null, null, true, context.getConfiguration());
}
closeables.addFirst(btInput);
Validate.isTrue(btInput.hasNext(), "Empty B' input!");
int aRowBegin = pass * blockHeight;
int bh = Math.min(blockHeight, aRowCount - aRowBegin);
/*
* check if we need to trim block allocation
*/
if (pass > 0) {
if (bh == blockHeight) {
for (int i = 0; i < kp; i++) {
Arrays.fill(yiCols[i], 0.0);
}
} else {
for (int i = 0; i < kp; i++) {
yiCols[i] = null;
}
for (int i = 0; i < kp; i++) {
yiCols[i] = new double[bh];
}
}
}
while (btInput.hasNext()) {
Pair<IntWritable, VectorWritable> btRec = btInput.next();
int btIndex = btRec.getFirst().get();
Vector btVec = btRec.getSecond().get();
Vector aCol;
if (btIndex > aCols.length || (aCol = aCols[btIndex]) == null
|| aCol.size() == 0) {
/* 100% zero A column in the block, skip it as sparse */
continue;
}
int j = -1;
for (Vector.Element aEl : aCol.nonZeroes()) {
j = aEl.index();
/*
* now we compute only swathes between aRowBegin..aRowBegin+bh
* exclusive. it seems like a deficiency but in fact i think it
* will balance itself out: either A is dense and then we
* shouldn't have more than one pass and therefore filter
* conditions will never kick in. Or, the only situation where we
* can't fit Y_i block in memory is when A input is much sparser
* than k+p per row. But if this is the case, then we'd be looking
* at very few elements without engaging them in any operations so
* even then it should be ok.
*/
if (j < aRowBegin) {
continue;
}
if (j >= aRowBegin + bh) {
break;
}
/*
* assume btVec is dense
*/
if (xi != null) {
/*
* MAHOUT-817: PCA correction for B'. I rewrite the whole
* computation loop so i don't have to check if PCA correction
* is needed at individual element level. It looks bulkier this
* way but perhaps less wasteful on cpu.
*/
for (int s = 0; s < kp; s++) {
// code defensively against shortened xi
double xii = xi.size() > btIndex ? xi.get(btIndex) : 0.0;
yiCols[s][j - aRowBegin] +=
aEl.get() * (btVec.getQuick(s) - xii * sq.get(s));
}
} else {
/*
* no PCA correction
*/
for (int s = 0; s < kp; s++) {
yiCols[s][j - aRowBegin] += aEl.get() * btVec.getQuick(s);
}
}
}
if (lastRowIndex < j) {
lastRowIndex = j;
}
}
/*
* so now we have stuff in yi
*/
dbw.setBlock(yiCols);
outKey.setTaskItemOrdinal(pass);
context.write(outKey, dbw);
closeables.remove(btInput);
btInput.close();
}
} finally {
IOUtils.close(closeables);
}
}