in pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java [192:438]
public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
int numExpressions = _inputExpressions.size();
boolean[] singleValues = new boolean[numExpressions];
DataType[] valueTypes = new DataType[numExpressions];
Object[] valueArrays = new Object[numExpressions];
extractValues(blockValSetMap, singleValues, valueTypes, valueArrays);
int numFilters = _filterEvaluators.size();
// Main expression is always index 0
if (valueTypes[0] != DataType.BYTES) {
List<UpdateSketch> updateSketches = getUpdateSketches(aggregationResultHolder);
if (singleValues[0]) {
switch (valueTypes[0]) {
case INT:
int[] intValues = (int[]) valueArrays[0];
if (_includeDefaultSketch) {
UpdateSketch defaultSketch = updateSketches.get(0);
for (int i = 0; i < length; i++) {
defaultSketch.update(intValues[i]);
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
UpdateSketch updateSketch = updateSketches.get(i + 1);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
updateSketch.update(intValues[j]);
}
}
}
break;
case LONG:
long[] longValues = (long[]) valueArrays[0];
if (_includeDefaultSketch) {
UpdateSketch defaultSketch = updateSketches.get(0);
for (int i = 0; i < length; i++) {
defaultSketch.update(longValues[i]);
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
UpdateSketch updateSketch = updateSketches.get(i + 1);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
updateSketch.update(longValues[j]);
}
}
}
break;
case FLOAT:
float[] floatValues = (float[]) valueArrays[0];
if (_includeDefaultSketch) {
UpdateSketch defaultSketch = updateSketches.get(0);
for (int i = 0; i < length; i++) {
defaultSketch.update(floatValues[i]);
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
UpdateSketch updateSketch = updateSketches.get(i + 1);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
updateSketch.update(floatValues[j]);
}
}
}
break;
case DOUBLE:
double[] doubleValues = (double[]) valueArrays[0];
if (_includeDefaultSketch) {
UpdateSketch defaultSketch = updateSketches.get(0);
for (int i = 0; i < length; i++) {
defaultSketch.update(doubleValues[i]);
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
UpdateSketch updateSketch = updateSketches.get(i + 1);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
updateSketch.update(doubleValues[j]);
}
}
}
break;
case STRING:
String[] stringValues = (String[]) valueArrays[0];
if (_includeDefaultSketch) {
UpdateSketch defaultSketch = updateSketches.get(0);
for (int i = 0; i < length; i++) {
defaultSketch.update(stringValues[i]);
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
UpdateSketch updateSketch = updateSketches.get(i + 1);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
updateSketch.update(stringValues[j]);
}
}
}
break;
default:
throw new IllegalStateException(
"Illegal single-value data type for DISTINCT_COUNT_THETA_SKETCH aggregation function: "
+ valueTypes[0]);
}
} else {
switch (valueTypes[0]) {
case INT:
int[][] intValues = (int[][]) valueArrays[0];
if (_includeDefaultSketch) {
UpdateSketch defaultSketch = updateSketches.get(0);
for (int i = 0; i < length; i++) {
for (int value : intValues[i]) {
defaultSketch.update(value);
}
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
UpdateSketch updateSketch = updateSketches.get(i + 1);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
for (int value : intValues[j]) {
updateSketch.update(value);
}
}
}
}
break;
case LONG:
long[][] longValues = (long[][]) valueArrays[0];
if (_includeDefaultSketch) {
UpdateSketch defaultSketch = updateSketches.get(0);
for (int i = 0; i < length; i++) {
for (long value : longValues[i]) {
defaultSketch.update(value);
}
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
UpdateSketch updateSketch = updateSketches.get(i + 1);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
for (long value : longValues[j]) {
updateSketch.update(value);
}
}
}
}
break;
case FLOAT:
float[][] floatValues = (float[][]) valueArrays[0];
if (_includeDefaultSketch) {
UpdateSketch defaultSketch = updateSketches.get(0);
for (int i = 0; i < length; i++) {
for (float value : floatValues[i]) {
defaultSketch.update(value);
}
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
UpdateSketch updateSketch = updateSketches.get(i + 1);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
for (float value : floatValues[j]) {
updateSketch.update(value);
}
}
}
}
break;
case DOUBLE:
double[][] doubleValues = (double[][]) valueArrays[0];
if (_includeDefaultSketch) {
UpdateSketch defaultSketch = updateSketches.get(0);
for (int i = 0; i < length; i++) {
for (double value : doubleValues[i]) {
defaultSketch.update(value);
}
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
UpdateSketch updateSketch = updateSketches.get(i + 1);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
for (double value : doubleValues[j]) {
updateSketch.update(value);
}
}
}
}
break;
case STRING:
String[][] stringValues = (String[][]) valueArrays[0];
if (_includeDefaultSketch) {
UpdateSketch defaultSketch = updateSketches.get(0);
for (int i = 0; i < length; i++) {
for (String value : stringValues[i]) {
defaultSketch.update(value);
}
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
UpdateSketch updateSketch = updateSketches.get(i + 1);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
for (String value : stringValues[j]) {
updateSketch.update(value);
}
}
}
}
break;
default:
throw new IllegalStateException(
"Illegal multi-value data type for DISTINCT_COUNT_THETA_SKETCH aggregation function: " + valueTypes[0]);
}
}
} else {
// Serialized sketch
List<ThetaSketchAccumulator> thetaSketchAccumulators = getUnions(aggregationResultHolder);
Sketch[] sketches = deserializeSketches((byte[][]) valueArrays[0], length);
if (_includeDefaultSketch) {
ThetaSketchAccumulator defaultThetaAccumulator = thetaSketchAccumulators.get(0);
for (Sketch sketch : sketches) {
defaultThetaAccumulator.apply(sketch);
}
}
for (int i = 0; i < numFilters; i++) {
FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
ThetaSketchAccumulator thetaSketchAccumulator = thetaSketchAccumulators.get(i + 1);
for (int j = 0; j < length; j++) {
if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
thetaSketchAccumulator.apply(sketches[j]);
}
}
}
}
}