private static MatrixBlock countDistinctValuesNaive()

in src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixCountDistinct.java [125:353]


	private static MatrixBlock countDistinctValuesNaive(MatrixBlock blkIn, CountDistinctOperator op) {

		if (blkIn.isEmpty()) {
			return new MatrixBlock(1);
		}
		else if(blkIn instanceof CompressedMatrixBlock) {
			throw new NotImplementedException("countDistinct() does not support CompressedMatrixBlock");
		}

		Set<Double> distinct = new HashSet<>();
		MatrixBlock blkOut;
		double[] data;

		if (op.getDirection().isRowCol()) {
			blkOut = new MatrixBlock(1, 1, false);

			long distinctCount = 0;
			long nonZeros = blkIn.getNonZeros();

			// Check if input matrix contains any 0 values for RowCol case.
			// This does not apply to row/col case, where we count nnz per row or col during iteration.
			if(nonZeros != -1 && nonZeros < (long) blkIn.getNumColumns() * blkIn.getNumRows()) {
				distinct.add(0d);
			}

			if(blkIn.getSparseBlock() != null) {
				SparseBlock sb = blkIn.getSparseBlock();
				if(blkIn.getSparseBlock().isContiguous()) {
					// COO, CSR
					data = sb.values(0);
					distinctCount = countDistinctValuesNaive(data, distinct);
				} else {
					// MCSR
					for(int i = 0; i < blkIn.getNumRows(); i++) {
						if(!sb.isEmpty(i)) {
							data = blkIn.getSparseBlock().values(i);
							distinctCount = countDistinctValuesNaive(data, distinct);
						}
					}
				}
			} else if(blkIn.getDenseBlock() != null) {
				DenseBlock db = blkIn.getDenseBlock();
				for (int i = 0; i <= db.numBlocks(); i++) {
					data = db.valuesAt(i);
					distinctCount = countDistinctValuesNaive(data, distinct);
				}
			}

			blkOut.set(0, 0, distinctCount);
		} else if (op.getDirection().isRow()) {
			blkOut = new MatrixBlock(blkIn.getNumRows(), 1, false, blkIn.getNumRows());
			blkOut.allocateBlock();

			if (blkIn.getDenseBlock() != null) {
				// The naive approach would be to iterate through every (i, j) in the input. However, can do better
				// by exploiting the physical layout of dense blocks - contiguous blocks in row-major order - in memory.
				DenseBlock db = blkIn.getDenseBlock();
				for (int bix=0; bix<db.numBlocks(); ++bix) {
					data = db.valuesAt(bix);
					for (int rix=bix * db.blockSize(); rix<blkIn.getNumRows(); rix++) {
						distinct.clear();
						for (int cix=0; cix<blkIn.getNumColumns(); ++cix) {
							distinct.add(data[db.pos(rix, cix)]);
						}
						blkOut.set(rix, 0, distinct.size());
					}
				}
			} else if (blkIn.getSparseBlock() != null) {
				// Each sparse block type - COO, CSR, MCSR - has a different data representation, which we will exploit
				// separately.
				SparseBlock sb = blkIn.getSparseBlock();
				if (SparseBlockFactory.isSparseBlockType(sb, SparseBlock.Type.MCSR)) {
					// Currently, SparseBlockIterator only provides an interface for cell-wise iteration.
					// TODO Explore row-wise and column-wise methods for SparseBlockIterator

					// MCSR enables O(1) access to column values per row
					for (int rix=0; rix<blkIn.getNumRows(); ++rix) {
						if (sb.isEmpty(rix)) {
							continue;
						}
						distinct.clear();
						data = sb.values(rix);
						countDistinctValuesNaive(data, distinct);
						blkOut.set(rix, 0, distinct.size());
					}
				} else if (SparseBlockFactory.isSparseBlockType(sb, SparseBlock.Type.CSR)) {
					// Casting is safe given if-condition
					SparseBlockCSR csrBlock = (SparseBlockCSR) sb;

					// Data lies in one contiguous block in CSR format. We will iterate in row-major using O(1) op
					// size(row) to determine the number of columns per row.
					data = csrBlock.values();
					// We want to iterate through all rows to keep track of the row index for constructing the output
					for (int rix=0; rix<blkIn.getNumRows(); ++rix) {
						if (csrBlock.isEmpty(rix)) {
							continue;
						}
						distinct.clear();
						int rpos = csrBlock.pos(rix);
						int clen = csrBlock.size(rix);
						for (int colOffset=0; colOffset<clen; ++colOffset) {
							distinct.add(data[rpos + colOffset]);
						}
						blkOut.set(rix, 0, distinct.size());
					}
				} else { // COO
					if (!(sb instanceof SparseBlockCOO)) {
						throw new IllegalArgumentException("Input matrix is of unrecognized type: "
								+ sb.getClass().getSimpleName());
					}
					SparseBlockCOO cooBlock = (SparseBlockCOO) sb;

					// For COO, we want to avoid using pos(row) and size(row) as they use binary search, which is a
					// O(log N) op. Also, isEmpty(row) uses pos(row) internally.
					int[] rixs = cooBlock.rowIndexes();
					data = cooBlock.values();
					int i = 0;  // data iterator
					int rix = 0;  // row index
					while (rix < cooBlock.numRows() && i < rixs.length) {
						distinct.clear();
						while (i + 1 < rixs.length && rixs[i] == rixs[i + 1]) {
							distinct.add(data[i]);
							i++;
						}
						if (i + 1 < rixs.length) {  // rixs[i] != rixs[i + 1]
							distinct.add(data[i]);
						}
						blkOut.set(rix, 0, distinct.size());
						rix = (i + 1 < rixs.length)? rixs[i + 1] : rix;
						i++;
					}
				}
			}
		} else {  // Col aggregation
			blkOut = new MatrixBlock(1, blkIn.getNumColumns(), false, blkIn.getNumColumns());
			blkOut.allocateBlock();

			// All dense and sparse formats (COO, CSR, MCSR) are row-major formats, so there is no obvious way to iterate
			// in column-major order besides iterating through every (i, j) pair. getValue() skips over empty cells in CSR
			// and MCSR formats, but not so in COO format. This results in O(log2 R * log2 C) time for every lookup,
			// amounting to O(RC * log2R * log2C) for the whole block (R, C <= 1000 in CP case). We will eschew this
			// approach in favor of one using a hash map M of (column index, distinct values) to obtain a pseudo column-major
			// grouping of distinct values instead. Given this setup, we will simply iterate over the input
			// (according to specific dense/sparse format) in row-major order and populate M. Finally, an O(C) iteration
			// over M will yield the final result.
			Map<Integer, Set<Double>> distinctValuesByCol = new HashMap<>();
			if (blkIn.getDenseBlock() != null) {
				DenseBlock db = blkIn.getDenseBlock();
				for (int bix=0; bix<db.numBlocks(); ++bix) {
					data = db.valuesAt(bix);
					for (int cix=0; cix<blkIn.getNumColumns(); ++cix) {
						Set<Double> distinctValues = distinctValuesByCol.getOrDefault(cix, new HashSet<>());
						for (int rix=bix * db.blockSize(); rix<blkIn.getNumRows(); rix++) {
							double val = data[db.pos(rix, cix)];
							distinctValues.add(val);
						}
						distinctValuesByCol.put(cix, distinctValues);
					}
				}
			} else if (blkIn.getSparseBlock() != null) {
				SparseBlock sb = blkIn.getSparseBlock();
				if (SparseBlockFactory.isSparseBlockType(sb, SparseBlock.Type.MCSR)) {
					for (int rix=0; rix<blkIn.getNumRows(); ++rix) {
						if (sb.isEmpty(rix)) {
							continue;
						}
						int[] cixs = sb.indexes(rix);
						data = sb.values(rix);
						for (int j=0; j<sb.size(rix); ++j) {
							int cix = cixs[j];
							Set<Double> distinctValues = distinctValuesByCol.getOrDefault(cix, new HashSet<>());
							distinctValues.add(data[j]);
							distinctValuesByCol.put(cix, distinctValues);
						}
					}
				} else if (SparseBlockFactory.isSparseBlockType(sb, SparseBlock.Type.CSR)) {
					SparseBlockCSR csrBlock = (SparseBlockCSR) sb;
					data = csrBlock.values();
					for (int rix=0; rix<blkIn.getNumRows(); ++rix) {
						if (csrBlock.isEmpty(rix)) {
							continue;
						}
						int rpos = csrBlock.pos(rix);
						int clen = csrBlock.size(rix);
						int[] cixs = csrBlock.indexes();
						for (int colOffset=0; colOffset<clen; ++colOffset) {
							int cix = cixs[rpos + colOffset];
							Set<Double> distinctValues = distinctValuesByCol.getOrDefault(cix, new HashSet<>());
							distinctValues.add(data[rpos + colOffset]);
							distinctValuesByCol.put(cix, distinctValues);
						}
					}
				} else {  // COO
					if (!(sb instanceof SparseBlockCOO)) {
						throw new IllegalArgumentException("Input matrix is of unrecognized type: "
								+ sb.getClass().getSimpleName());
					}
					SparseBlockCOO cooBlock = (SparseBlockCOO) sb;

					int[] rixs = cooBlock.rowIndexes();
					int[] cixs = cooBlock.indexes();
					data = cooBlock.values();
					int i = 0;  // data iterator
					while (i < rixs.length) {
						while (i + 1 < rixs.length && rixs[i] == rixs[i + 1]) {
							int cix = cixs[i];
							Set<Double> distinctValues = distinctValuesByCol.getOrDefault(cix, new HashSet<>());
							distinctValues.add(data[i]);
							distinctValuesByCol.put(cix, distinctValues);
							i++;
						}
						if (i + 1 < rixs.length) {
							int cix = cixs[i];
							Set<Double> distinctValues = distinctValuesByCol.getOrDefault(cix, new HashSet<>());
							distinctValues.add(data[i]);
							distinctValuesByCol.put(cix, distinctValues);
						}
						i++;
					}
				}
			}
			// Fill in output block with column aggregation results
			for (int cix : distinctValuesByCol.keySet()) {
				blkOut.set(0, cix, distinctValuesByCol.get(cix).size());
			}
		}

		return blkOut;
	}