in src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixCountDistinct.java [125:353]
private static MatrixBlock countDistinctValuesNaive(MatrixBlock blkIn, CountDistinctOperator op) {
if (blkIn.isEmpty()) {
return new MatrixBlock(1);
}
else if(blkIn instanceof CompressedMatrixBlock) {
throw new NotImplementedException("countDistinct() does not support CompressedMatrixBlock");
}
Set<Double> distinct = new HashSet<>();
MatrixBlock blkOut;
double[] data;
if (op.getDirection().isRowCol()) {
blkOut = new MatrixBlock(1, 1, false);
long distinctCount = 0;
long nonZeros = blkIn.getNonZeros();
// Check if input matrix contains any 0 values for RowCol case.
// This does not apply to row/col case, where we count nnz per row or col during iteration.
if(nonZeros != -1 && nonZeros < (long) blkIn.getNumColumns() * blkIn.getNumRows()) {
distinct.add(0d);
}
if(blkIn.getSparseBlock() != null) {
SparseBlock sb = blkIn.getSparseBlock();
if(blkIn.getSparseBlock().isContiguous()) {
// COO, CSR
data = sb.values(0);
distinctCount = countDistinctValuesNaive(data, distinct);
} else {
// MCSR
for(int i = 0; i < blkIn.getNumRows(); i++) {
if(!sb.isEmpty(i)) {
data = blkIn.getSparseBlock().values(i);
distinctCount = countDistinctValuesNaive(data, distinct);
}
}
}
} else if(blkIn.getDenseBlock() != null) {
DenseBlock db = blkIn.getDenseBlock();
for (int i = 0; i <= db.numBlocks(); i++) {
data = db.valuesAt(i);
distinctCount = countDistinctValuesNaive(data, distinct);
}
}
blkOut.set(0, 0, distinctCount);
} else if (op.getDirection().isRow()) {
blkOut = new MatrixBlock(blkIn.getNumRows(), 1, false, blkIn.getNumRows());
blkOut.allocateBlock();
if (blkIn.getDenseBlock() != null) {
// The naive approach would be to iterate through every (i, j) in the input. However, can do better
// by exploiting the physical layout of dense blocks - contiguous blocks in row-major order - in memory.
DenseBlock db = blkIn.getDenseBlock();
for (int bix=0; bix<db.numBlocks(); ++bix) {
data = db.valuesAt(bix);
for (int rix=bix * db.blockSize(); rix<blkIn.getNumRows(); rix++) {
distinct.clear();
for (int cix=0; cix<blkIn.getNumColumns(); ++cix) {
distinct.add(data[db.pos(rix, cix)]);
}
blkOut.set(rix, 0, distinct.size());
}
}
} else if (blkIn.getSparseBlock() != null) {
// Each sparse block type - COO, CSR, MCSR - has a different data representation, which we will exploit
// separately.
SparseBlock sb = blkIn.getSparseBlock();
if (SparseBlockFactory.isSparseBlockType(sb, SparseBlock.Type.MCSR)) {
// Currently, SparseBlockIterator only provides an interface for cell-wise iteration.
// TODO Explore row-wise and column-wise methods for SparseBlockIterator
// MCSR enables O(1) access to column values per row
for (int rix=0; rix<blkIn.getNumRows(); ++rix) {
if (sb.isEmpty(rix)) {
continue;
}
distinct.clear();
data = sb.values(rix);
countDistinctValuesNaive(data, distinct);
blkOut.set(rix, 0, distinct.size());
}
} else if (SparseBlockFactory.isSparseBlockType(sb, SparseBlock.Type.CSR)) {
// Casting is safe given if-condition
SparseBlockCSR csrBlock = (SparseBlockCSR) sb;
// Data lies in one contiguous block in CSR format. We will iterate in row-major using O(1) op
// size(row) to determine the number of columns per row.
data = csrBlock.values();
// We want to iterate through all rows to keep track of the row index for constructing the output
for (int rix=0; rix<blkIn.getNumRows(); ++rix) {
if (csrBlock.isEmpty(rix)) {
continue;
}
distinct.clear();
int rpos = csrBlock.pos(rix);
int clen = csrBlock.size(rix);
for (int colOffset=0; colOffset<clen; ++colOffset) {
distinct.add(data[rpos + colOffset]);
}
blkOut.set(rix, 0, distinct.size());
}
} else { // COO
if (!(sb instanceof SparseBlockCOO)) {
throw new IllegalArgumentException("Input matrix is of unrecognized type: "
+ sb.getClass().getSimpleName());
}
SparseBlockCOO cooBlock = (SparseBlockCOO) sb;
// For COO, we want to avoid using pos(row) and size(row) as they use binary search, which is a
// O(log N) op. Also, isEmpty(row) uses pos(row) internally.
int[] rixs = cooBlock.rowIndexes();
data = cooBlock.values();
int i = 0; // data iterator
int rix = 0; // row index
while (rix < cooBlock.numRows() && i < rixs.length) {
distinct.clear();
while (i + 1 < rixs.length && rixs[i] == rixs[i + 1]) {
distinct.add(data[i]);
i++;
}
if (i + 1 < rixs.length) { // rixs[i] != rixs[i + 1]
distinct.add(data[i]);
}
blkOut.set(rix, 0, distinct.size());
rix = (i + 1 < rixs.length)? rixs[i + 1] : rix;
i++;
}
}
}
} else { // Col aggregation
blkOut = new MatrixBlock(1, blkIn.getNumColumns(), false, blkIn.getNumColumns());
blkOut.allocateBlock();
// All dense and sparse formats (COO, CSR, MCSR) are row-major formats, so there is no obvious way to iterate
// in column-major order besides iterating through every (i, j) pair. getValue() skips over empty cells in CSR
// and MCSR formats, but not so in COO format. This results in O(log2 R * log2 C) time for every lookup,
// amounting to O(RC * log2R * log2C) for the whole block (R, C <= 1000 in CP case). We will eschew this
// approach in favor of one using a hash map M of (column index, distinct values) to obtain a pseudo column-major
// grouping of distinct values instead. Given this setup, we will simply iterate over the input
// (according to specific dense/sparse format) in row-major order and populate M. Finally, an O(C) iteration
// over M will yield the final result.
Map<Integer, Set<Double>> distinctValuesByCol = new HashMap<>();
if (blkIn.getDenseBlock() != null) {
DenseBlock db = blkIn.getDenseBlock();
for (int bix=0; bix<db.numBlocks(); ++bix) {
data = db.valuesAt(bix);
for (int cix=0; cix<blkIn.getNumColumns(); ++cix) {
Set<Double> distinctValues = distinctValuesByCol.getOrDefault(cix, new HashSet<>());
for (int rix=bix * db.blockSize(); rix<blkIn.getNumRows(); rix++) {
double val = data[db.pos(rix, cix)];
distinctValues.add(val);
}
distinctValuesByCol.put(cix, distinctValues);
}
}
} else if (blkIn.getSparseBlock() != null) {
SparseBlock sb = blkIn.getSparseBlock();
if (SparseBlockFactory.isSparseBlockType(sb, SparseBlock.Type.MCSR)) {
for (int rix=0; rix<blkIn.getNumRows(); ++rix) {
if (sb.isEmpty(rix)) {
continue;
}
int[] cixs = sb.indexes(rix);
data = sb.values(rix);
for (int j=0; j<sb.size(rix); ++j) {
int cix = cixs[j];
Set<Double> distinctValues = distinctValuesByCol.getOrDefault(cix, new HashSet<>());
distinctValues.add(data[j]);
distinctValuesByCol.put(cix, distinctValues);
}
}
} else if (SparseBlockFactory.isSparseBlockType(sb, SparseBlock.Type.CSR)) {
SparseBlockCSR csrBlock = (SparseBlockCSR) sb;
data = csrBlock.values();
for (int rix=0; rix<blkIn.getNumRows(); ++rix) {
if (csrBlock.isEmpty(rix)) {
continue;
}
int rpos = csrBlock.pos(rix);
int clen = csrBlock.size(rix);
int[] cixs = csrBlock.indexes();
for (int colOffset=0; colOffset<clen; ++colOffset) {
int cix = cixs[rpos + colOffset];
Set<Double> distinctValues = distinctValuesByCol.getOrDefault(cix, new HashSet<>());
distinctValues.add(data[rpos + colOffset]);
distinctValuesByCol.put(cix, distinctValues);
}
}
} else { // COO
if (!(sb instanceof SparseBlockCOO)) {
throw new IllegalArgumentException("Input matrix is of unrecognized type: "
+ sb.getClass().getSimpleName());
}
SparseBlockCOO cooBlock = (SparseBlockCOO) sb;
int[] rixs = cooBlock.rowIndexes();
int[] cixs = cooBlock.indexes();
data = cooBlock.values();
int i = 0; // data iterator
while (i < rixs.length) {
while (i + 1 < rixs.length && rixs[i] == rixs[i + 1]) {
int cix = cixs[i];
Set<Double> distinctValues = distinctValuesByCol.getOrDefault(cix, new HashSet<>());
distinctValues.add(data[i]);
distinctValuesByCol.put(cix, distinctValues);
i++;
}
if (i + 1 < rixs.length) {
int cix = cixs[i];
Set<Double> distinctValues = distinctValuesByCol.getOrDefault(cix, new HashSet<>());
distinctValues.add(data[i]);
distinctValuesByCol.put(cix, distinctValues);
}
i++;
}
}
}
// Fill in output block with column aggregation results
for (int cix : distinctValuesByCol.keySet()) {
blkOut.set(0, cix, distinctValuesByCol.get(cix).size());
}
}
return blkOut;
}