public void processInstruction()

in src/main/java/org/apache/sysds/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java [198:620]


	public void processInstruction(ExecutionContext ec) {
		SparkExecutionContext sec = (SparkExecutionContext) ec;
		String opcode = getOpcode();

		// opcode guaranteed to be a valid opcode (see parsing)
		if(opcode.equalsIgnoreCase(Opcodes.MAPGROUPEDAGG.toString())) {
			// get input rdd handle
			String targetVar = params.get(Statement.GAGG_TARGET);
			String groupsVar = params.get(Statement.GAGG_GROUPS);
			JavaPairRDD<MatrixIndexes, MatrixBlock> target = sec.getBinaryMatrixBlockRDDHandleForVariable(targetVar);
			PartitionedBroadcast<MatrixBlock> groups = sec.getBroadcastForVariable(groupsVar);
			DataCharacteristics mc1 = sec.getDataCharacteristics(targetVar);
			DataCharacteristics mcOut = sec.getDataCharacteristics(output.getName());
			CPOperand ngrpOp = new CPOperand(params.get(Statement.GAGG_NUM_GROUPS));
			int ngroups = (int) sec.getScalarInput(ngrpOp).getLongValue();

			// single-block aggregation
			if(ngroups <= mc1.getBlocksize() && mc1.getCols() <= mc1.getBlocksize()) {
				// execute map grouped aggregate
				JavaRDD<MatrixBlock> out = target.map(new RDDMapGroupedAggFunction2(groups, _optr, ngroups));
				MatrixBlock out2 = RDDAggregateUtils.sumStable(out);

				// put output block into symbol table (no lineage because single block)
				// this also includes implicit maintenance of matrix characteristics
				sec.setMatrixOutput(output.getName(), out2);
			}
			// multi-block aggregation
			else {
				// execute map grouped aggregate
				JavaPairRDD<MatrixIndexes, MatrixBlock> out = target
					.flatMapToPair(new RDDMapGroupedAggFunction(groups, _optr, ngroups, mc1.getBlocksize()));

				out = RDDAggregateUtils.sumByKeyStable(out, false);

				// updated characteristics and handle outputs
				mcOut.set(ngroups, mc1.getCols(), mc1.getBlocksize(), -1);
				sec.setRDDHandleForVariable(output.getName(), out);
				sec.addLineageRDD(output.getName(), targetVar);
				sec.addLineageBroadcast(output.getName(), groupsVar);
			}
		}
		else if(opcode.equalsIgnoreCase(Opcodes.GROUPEDAGG.toString())) {
			boolean broadcastGroups = Boolean.parseBoolean(params.get("broadcast"));

			// get input rdd handle
			String groupsVar = params.get(Statement.GAGG_GROUPS);
			JavaPairRDD<MatrixIndexes, MatrixBlock> target = sec
				.getBinaryMatrixBlockRDDHandleForVariable(params.get(Statement.GAGG_TARGET));
			JavaPairRDD<MatrixIndexes, MatrixBlock> groups = broadcastGroups ? null : sec
				.getBinaryMatrixBlockRDDHandleForVariable(groupsVar);
			JavaPairRDD<MatrixIndexes, MatrixBlock> weights = null;

			DataCharacteristics mc1 = sec.getDataCharacteristics(params.get(Statement.GAGG_TARGET));
			DataCharacteristics mc2 = sec.getDataCharacteristics(groupsVar);
			if(mc1.dimsKnown() && mc2.dimsKnown() && (mc1.getRows() != mc2.getRows() || mc2.getCols() != 1)) {
				throw new DMLRuntimeException("Grouped Aggregate dimension mismatch between target and groups.");
			}
			DataCharacteristics mcOut = sec.getDataCharacteristics(output.getName());

			JavaPairRDD<MatrixIndexes, WeightedCell> groupWeightedCells = null;

			// Step 1: First extract groupWeightedCells from group, target and weights
			if(params.get(Statement.GAGG_WEIGHTS) != null) {
				weights = sec.getBinaryMatrixBlockRDDHandleForVariable(params.get(Statement.GAGG_WEIGHTS));

				DataCharacteristics mc3 = sec.getDataCharacteristics(params.get(Statement.GAGG_WEIGHTS));
				if(mc1.dimsKnown() && mc3.dimsKnown() &&
					(mc1.getRows() != mc3.getRows() || mc1.getCols() != mc3.getCols())) {
					throw new DMLRuntimeException(
						"Grouped Aggregate dimension mismatch between target, groups, and weights.");
				}

				groupWeightedCells = groups.join(target).join(weights).flatMapToPair(new ExtractGroupNWeights());
			}
			else // input vector or matrix
			{
				String ngroupsStr = params.get(Statement.GAGG_NUM_GROUPS);
				long ngroups = (ngroupsStr != null) ? (long) Double.parseDouble(ngroupsStr) : -1;

				// execute basic grouped aggregate (extract and preagg)
				if(broadcastGroups) {
					PartitionedBroadcast<MatrixBlock> pbm = sec.getBroadcastForVariable(groupsVar);
					groupWeightedCells = target
						.flatMapToPair(new ExtractGroupBroadcast(pbm, mc1.getBlocksize(), ngroups, _optr));
				}
				else { // general case

					// replicate groups if necessary
					if(mc1.getNumColBlocks() > 1) {
						groups = groups.flatMapToPair(new ReplicateVectorFunction(false, mc1.getNumColBlocks()));
					}

					groupWeightedCells = groups.join(target)
						.flatMapToPair(new ExtractGroupJoin(mc1.getBlocksize(), ngroups, _optr));
				}
			}

			// Step 2: Make sure we have blen required while creating <MatrixIndexes, MatrixCell>
			if(mc1.getBlocksize() == -1) {
				throw new DMLRuntimeException("The block sizes are not specified for grouped aggregate");
			}
			int blen = mc1.getBlocksize();

			// Step 3: Now perform grouped aggregate operation (either on combiner side or reducer side)
			JavaPairRDD<MatrixIndexes, MatrixCell> out = null;
			if(_optr instanceof CMOperator && ((CMOperator) _optr).isPartialAggregateOperator() ||
				_optr instanceof AggregateOperator) {
				out = groupWeightedCells.reduceByKey(new PerformGroupByAggInCombiner(_optr))
					.mapValues(new CreateMatrixCell(blen, _optr));
			}
			else {
				// Use groupby key because partial aggregation is not supported
				out = groupWeightedCells.groupByKey().mapValues(new PerformGroupByAggInReducer(_optr))
					.mapValues(new CreateMatrixCell(blen, _optr));
			}

			// Step 4: Set output characteristics and rdd handle
			setOutputCharacteristicsForGroupedAgg(mc1, mcOut, out);

			// store output rdd handle
			sec.setRDDHandleForVariable(output.getName(), out);
			sec.addLineageRDD(output.getName(), params.get(Statement.GAGG_TARGET));
			sec.addLineage(output.getName(), groupsVar, broadcastGroups);
			if(params.get(Statement.GAGG_WEIGHTS) != null) {
				sec.addLineageRDD(output.getName(), params.get(Statement.GAGG_WEIGHTS));
			}
		}
		else if(opcode.equalsIgnoreCase(Opcodes.RMEMPTY.toString())) {
			String rddInVar = params.get("target");
			String rddOffVar = params.get("offset");

			boolean rows = sec.getScalarInput(params.get("margin"), ValueType.STRING, true).getStringValue()
				.equals("rows");
			boolean emptyReturn = Boolean.parseBoolean(params.get("empty.return").toLowerCase());
			long maxDim = sec.getScalarInput(params.get("maxdim"), ValueType.FP64, false).getLongValue();
			boolean bRmEmptyBC = Boolean.parseBoolean(params.get("bRmEmptyBC"));
			DataCharacteristics mcIn = sec.getDataCharacteristics(rddInVar);

			if(maxDim > 0) // default case
			{
				// get input rdd handle
				JavaPairRDD<MatrixIndexes, MatrixBlock> in = sec.getBinaryMatrixBlockRDDHandleForVariable(rddInVar);
				JavaPairRDD<MatrixIndexes, MatrixBlock> off;
				PartitionedBroadcast<MatrixBlock> broadcastOff;
				long blen = mcIn.getBlocksize();
				long numRep = (long) Math.ceil(rows ? (double) mcIn.getCols() / blen : (double) mcIn.getRows() / blen);

				// execute remove empty rows/cols operation
				JavaPairRDD<MatrixIndexes, MatrixBlock> out;

				if(bRmEmptyBC) {
					broadcastOff = sec.getBroadcastForVariable(rddOffVar);
					// Broadcast offset vector
					out = in.flatMapToPair(new RDDRemoveEmptyFunctionInMem(rows, maxDim, blen, broadcastOff));
				}
				else {
					off = sec.getBinaryMatrixBlockRDDHandleForVariable(rddOffVar);
					out = in.join(off.flatMapToPair(new ReplicateVectorFunction(!rows, numRep)))
						.flatMapToPair(new RDDRemoveEmptyFunction(rows, maxDim, blen));
				}

				out = RDDAggregateUtils.mergeByKey(out, false);

				// store output rdd handle
				sec.setRDDHandleForVariable(output.getName(), out);
				sec.addLineageRDD(output.getName(), rddInVar);
				if(bRmEmptyBC)
					sec.addLineageBroadcast(output.getName(), rddOffVar);
				else
					sec.addLineageRDD(output.getName(), rddOffVar);

				// update output statistics (required for correctness)
				DataCharacteristics mcOut = sec.getDataCharacteristics(output.getName());
				mcOut.set(rows ? maxDim : mcIn.getRows(),
					rows ? mcIn.getCols() : maxDim,
					(int) blen,
					mcIn.getNonZeros());
			}
			else // special case: empty output (ensure valid dims)
			{
				int n = emptyReturn ? 1 : 0;
				MatrixBlock out = new MatrixBlock(rows ? n : (int) mcIn.getRows(), rows ? (int) mcIn.getCols() : n,
					true);
				sec.setMatrixOutput(output.getName(), out);
			}
		}
		else if(opcode.equalsIgnoreCase(Opcodes.CONTAINS.toString())) {
			
			JavaPairRDD<MatrixIndexes, MatrixBlock> in1 = sec
				.getBinaryMatrixBlockRDDHandleForVariable(params.get("target"));
			
			Data pattern = ec.getVariable(params.get("pattern"));
			if( pattern == null ) //literal
				pattern = ScalarObjectFactory.createScalarObject(ValueType.FP64, params.get("pattern"));
			
			boolean ret = false;
			if( pattern.getDataType().isScalar() ) {
				double dpattern = ((ScalarObject)pattern).getDoubleValue();
				ret = in1.values() //num blocks containing pattern
					.map(new RDDContainsFunction(dpattern))
					.reduce((a,b) -> a+b) > 0;
			}
			else {
				PartitionedBroadcast<MatrixBlock> bc = sec.getBroadcastForVariable(params.get("pattern"));
				DataCharacteristics dc = sec.getDataCharacteristics(params.get("target"));
				ret = in1.flatMapToPair(new RDDContainsVectFunction(bc, dc.getBlocksize()))
					.reduceByKey((a,b) -> a+b)
					.values().reduce((a,b) -> Math.max(a,b)) == dc.getNumColBlocks();
			}
			
			// execute contains operation 
			ec.setScalarOutput(output.getName(), new BooleanObject(ret));
		}
		else if(opcode.equalsIgnoreCase(Opcodes.REPLACE.toString())) {
			if(sec.isFrameObject(params.get("target"))){
				params.get("target");
				JavaPairRDD<Long, FrameBlock> in1 = sec.getFrameBinaryBlockRDDHandleForVariable(params.get("target"));
				DataCharacteristics mcIn = sec.getDataCharacteristics(params.get("target"));
				String pattern = params.get("pattern");
				String replacement = params.get("replacement");
				JavaPairRDD<Long, FrameBlock> out = in1.mapValues(new RDDFrameReplaceFunction(pattern, replacement));
				sec.setRDDHandleForVariable(output.getName(), out);
				sec.addLineageRDD(output.getName(), params.get("target"));
				sec.getDataCharacteristics(output.getName()).set(mcIn.getRows(), mcIn.getCols(), mcIn.getBlocksize(), mcIn.getNonZeros());
			}
			else {
				JavaPairRDD<MatrixIndexes, MatrixBlock> in1 = sec
					.getBinaryMatrixBlockRDDHandleForVariable(params.get("target"));
				DataCharacteristics mcIn = sec.getDataCharacteristics(params.get("target"));
	
				// execute replace operation
				double pattern = Double.parseDouble(params.get("pattern"));
				double replacement = Double.parseDouble(params.get("replacement"));
				JavaPairRDD<MatrixIndexes, MatrixBlock> out = in1.mapValues(new RDDReplaceFunction(pattern, replacement));
	
				// store output rdd handle
				sec.setRDDHandleForVariable(output.getName(), out);
				sec.addLineageRDD(output.getName(), params.get("target"));
	
				// update output statistics (required for correctness)
				sec.getDataCharacteristics(output.getName()).set(mcIn.getRows(),
					mcIn.getCols(),
					mcIn.getBlocksize(),
					(pattern != 0 && replacement != 0) ? mcIn.getNonZeros() : -1);
			}
		}
		else if(opcode.equalsIgnoreCase(Opcodes.LOWERTRI.toString()) || opcode.equalsIgnoreCase(Opcodes.UPPERTRI.toString())) {
			JavaPairRDD<MatrixIndexes, MatrixBlock> in1 = sec
				.getBinaryMatrixBlockRDDHandleForVariable(params.get("target"));
			DataCharacteristics mcIn = sec.getDataCharacteristics(params.get("target"));
			boolean lower = opcode.equalsIgnoreCase(Opcodes.LOWERTRI.toString());
			boolean diag = Boolean.parseBoolean(params.get("diag"));
			boolean values = Boolean.parseBoolean(params.get("values"));

			JavaPairRDD<MatrixIndexes, MatrixBlock> out = in1
				.mapPartitionsToPair(new RDDExtractTriangularFunction(lower, diag, values), true);

			// store output rdd handle
			sec.setRDDHandleForVariable(output.getName(), out);
			sec.addLineageRDD(output.getName(), params.get("target"));

			// update output statistics (required for correctness)
			sec.getDataCharacteristics(output.getName()).setDimension(mcIn.getRows(), mcIn.getCols());
		}
		else if(opcode.equalsIgnoreCase(Opcodes.REXPAND.toString())) {
			String rddInVar = params.get("target");

			// get input rdd handle
			JavaPairRDD<MatrixIndexes, MatrixBlock> in = sec.getBinaryMatrixBlockRDDHandleForVariable(rddInVar);
			DataCharacteristics mcIn = sec.getDataCharacteristics(rddInVar);
			
			// parse untyped parameters, w/ robust handling for 'max'
			String maxValName = params.get("max");
			long lmaxVal = maxValName.startsWith(Lop.SCALAR_VAR_NAME_PREFIX) ?
				ec.getScalarInput(maxValName, ValueType.FP64, false).getLongValue() :
				UtilFunctions.toLong(Double.parseDouble(maxValName));
			boolean dirRows = params.get("dir").equals("rows");
			boolean cast = Boolean.parseBoolean(params.get("cast"));
			boolean ignore = Boolean.parseBoolean(params.get("ignore"));
			long blen = mcIn.getBlocksize();

			// repartition input vector for higher degree of parallelism
			// (avoid scenarios where few input partitions create huge outputs)
			DataCharacteristics mcTmp = new MatrixCharacteristics(dirRows ? lmaxVal : mcIn.getRows(),
				dirRows ? mcIn.getRows() : lmaxVal, (int) blen, mcIn.getRows());
			int numParts = (int) Math.min(SparkUtils.getNumPreferredPartitions(mcTmp, in), mcIn.getNumBlocks());
			if(numParts > in.getNumPartitions() * 2)
				in = in.repartition(numParts);

			// execute rexpand rows/cols operation (no shuffle required because outputs are
			// block-aligned with the input, i.e., one input block generates n output blocks)
			JavaPairRDD<MatrixIndexes, MatrixBlock> out = in
				.flatMapToPair(new RDDRExpandFunction(lmaxVal, dirRows, cast, ignore, blen));

			// store output rdd handle
			sec.setRDDHandleForVariable(output.getName(), out);
			sec.addLineageRDD(output.getName(), rddInVar);

			// update output statistics (required for correctness, nnz unknown due to cut-off)
			DataCharacteristics mcOut = sec.getDataCharacteristics(output.getName());
			mcOut.set(dirRows ? lmaxVal : mcIn.getRows(), dirRows ? mcIn.getRows() : lmaxVal, (int) blen, -1);
			mcOut.setNonZerosBound(mcIn.getRows());

			// post-processing to obtain sparsity of ultra-sparse outputs
			SparkUtils.postprocessUltraSparseOutput(sec.getMatrixObject(output), mcOut);
		}
		else if(opcode.equalsIgnoreCase(Opcodes.TOKENIZE.toString())) {
			// get input RDD data
			FrameObject fo = sec.getFrameObject(params.get("target"));
			JavaPairRDD<Long, FrameBlock> in = (JavaPairRDD<Long, FrameBlock>) sec.getRDDHandleForFrameObject(fo,
				FileFormat.BINARY);
			DataCharacteristics mc = sec.getDataCharacteristics(params.get("target"));

			// construct tokenizer and tokenize text
			Tokenizer tokenizer = TokenizerFactory.createTokenizer(params.get("spec"),
				Integer.parseInt(params.get("max_tokens")));
			JavaPairRDD<Long, FrameBlock> out = in.mapToPair(new RDDTokenizeFunction(tokenizer, mc.getBlocksize()));

			// set output and maintain lineage/output characteristics
			sec.setRDDHandleForVariable(output.getName(), out);
			sec.addLineageRDD(output.getName(), params.get("target"));

			// get max tokens for row upper bound
			long numRows = tokenizer.getMaxNumRows((int)mc.getRows());
			long numCols = tokenizer.getNumCols();

			sec.getDataCharacteristics(output.getName()).set(numRows, numCols, mc.getBlocksize());
			sec.getFrameObject(output.getName()).setSchema(tokenizer.getSchema());
		}
		else if(opcode.equalsIgnoreCase(Opcodes.TRANSFORMAPPLY.toString())) {
			// get input RDD and meta data
			FrameObject fo = sec.getFrameObject(params.get("target"));
			JavaPairRDD<Long, FrameBlock> in = (JavaPairRDD<Long, FrameBlock>) sec.getRDDHandleForFrameObject(fo,
				FileFormat.BINARY);
			FrameBlock meta = sec.getFrameInput(params.get("meta"));
			MatrixBlock embeddings = params.get("embedding") != null ? ec.getMatrixInput(params.get("embedding")) : null;

			DataCharacteristics mcIn = sec.getDataCharacteristics(params.get("target"));
			DataCharacteristics mcOut = sec.getDataCharacteristics(output.getName());
			String[] colnames = !TfMetaUtils.isIDSpec(params.get("spec")) ? in.lookup(1L).get(0)
				.getColumnNames() : null;

			// compute omit offset map for block shifts
			TfOffsetMap omap = null;
			if(TfMetaUtils.containsOmitSpec(params.get("spec"), colnames)) {
				omap = new TfOffsetMap(SparkUtils.toIndexedLong(
					in.mapToPair(new RDDTransformApplyOffsetFunction(params.get("spec"), colnames)).collect()));
			}

			// create encoder broadcast (avoiding replication per task)
			MultiColumnEncoder encoder = EncoderFactory
				.createEncoder(params.get("spec"), colnames, fo.getSchema(), (int) fo.getNumColumns(), meta, embeddings);
			encoder.updateAllDCEncoders();
			mcOut.setDimension(mcIn.getRows() - ((omap != null) ? omap.getNumRmRows() : 0), encoder.getNumOutCols());

			long t0 = System.nanoTime();
			Broadcast<MultiColumnEncoder> bmeta = sec.getSparkContext().broadcast(encoder);
			Broadcast<TfOffsetMap> bomap = (omap != null) ? sec.getSparkContext().broadcast(omap) : null;
			if (DMLScript.STATISTICS) {
				SparkStatistics.accBroadCastTime(System.nanoTime() - t0);
				SparkStatistics.incBroadcastCount(1);
			}

			// execute transform apply
			JavaPairRDD<MatrixIndexes, MatrixBlock> out;
			Tuple2<Boolean, Integer> aligned = FrameRDDAggregateUtils.checkRowAlignment(in, -1);
			// NOTE: currently disabled for LegacyEncoders, because OMIT probably results in not aligned
			// blocks and for IMPUTE was an inaccuracy for the "testHomesImputeColnamesSparkCSV" test case.

			// Error in test case: Expected: 8.150349617004395 vs actual: 8.15035 at 0 8 (expected is calculated from transform encode,
			// which currently always uses the else branch: either inaccuracy must come from serialisation of
			// matrixblock or from binaryBlockToBinaryBlock reblock
			if(aligned._1 && mcOut.getCols() <= aligned._2 && !encoder.hasLegacyEncoder() /*&& containsWE*/) {
				//Blocks are aligned & #Col is below Block length (necessary for matrix-matrix reblock)
				JavaPairRDD<Long, MatrixBlock> tmp = in.mapToPair(new RDDTransformApplyFunction2(bmeta, bomap));
				mcIn.setBlocksize(aligned._2);
				mcIn.setDimension(mcIn.getRows(), mcOut.getCols());
				JavaPairRDD<MatrixIndexes, MatrixBlock> tmp2 = tmp.mapToPair((PairFunction<Tuple2<Long, MatrixBlock>, MatrixIndexes, MatrixBlock>) in12 ->
						new Tuple2<>(new MatrixIndexes(UtilFunctions.computeBlockIndex(in12._1, aligned._2),1), in12._2));
				out = RDDConverterUtils.binaryBlockToBinaryBlock(tmp2, mcIn, mcOut);
				//out = RDDConverterUtils.matrixBlockToAlignedMatrixBlock(tmp, mcOut, mcOut);
			} else {
				JavaPairRDD<Long, FrameBlock> tmp = in.mapToPair(new RDDTransformApplyFunction(bmeta, bomap));
				out = FrameRDDConverterUtils.binaryBlockToMatrixBlock(tmp, mcOut, mcOut);
			}

			// set output and maintain lineage/output characteristics
			sec.setRDDHandleForVariable(output.getName(), out);
			sec.addLineageRDD(output.getName(), params.get("target"));
			ec.releaseFrameInput(params.get("meta"));
			if(params.get("embedding") != null)
				ec.releaseMatrixInput(params.get("embedding"));
		}
		else if(opcode.equalsIgnoreCase(Opcodes.TRANSFORMDECODE.toString())) {
			// get input RDD and meta data
			JavaPairRDD<MatrixIndexes, MatrixBlock> in = sec
				.getBinaryMatrixBlockRDDHandleForVariable(params.get("target"));
			DataCharacteristics mc = sec.getDataCharacteristics(params.get("target"));
			FrameBlock meta = sec.getFrameInput(params.get("meta"));
			String[] colnames = meta.getColumnNames();

			// reblock if necessary (clen > blen)
			if(mc.getCols() > mc.getNumColBlocks()) {
				in = in.mapToPair(new RDDTransformDecodeExpandFunction((int) mc.getCols(), mc.getBlocksize()));
				in = RDDAggregateUtils.mergeByKey(in, false);
			}

			// construct decoder and decode individual matrix blocks
			Decoder decoder = DecoderFactory.createDecoder(params.get("spec"), colnames, null, meta);
			JavaPairRDD<Long, FrameBlock> out = in
				.mapToPair(new RDDTransformDecodeFunction(decoder, mc.getBlocksize()));

			// set output and maintain lineage/output characteristics
			sec.setRDDHandleForVariable(output.getName(), out);
			sec.addLineageRDD(output.getName(), params.get("target"));
			ec.releaseFrameInput(params.get("meta"));
			sec.getDataCharacteristics(output.getName()).set(mc.getRows(), meta.getNumColumns(), mc.getBlocksize(), -1);
			sec.getFrameObject(output.getName()).setSchema(decoder.getSchema());
		}
		else {
			throw new DMLRuntimeException("Unknown parameterized builtin opcode: " + opcode);
		}
	}