query/time_series_aggregate.h (342 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef QUERY_TIME_SERIES_AGGREGATE_H_
#define QUERY_TIME_SERIES_AGGREGATE_H_
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include "../cgoutils/utils.h"
// some macro used for host or device compilation
#ifdef RUN_ON_DEVICE
#define SET_DEVICE(device) cudaSetDevice(device)
#else
#define SET_DEVICE(device)
#endif
// These C-style array limits are used to make the query structure as flat as
// reasonably possible.
enum {
MAX_FOREIGN_TABLES = 7,
MAX_COLUMNS_OF_A_TABLE = 32,
MAX_DIMENSIONS = 8,
MAX_DIMENSION_BYTES = 32,
MAX_MEASURES = 32,
MAX_INSTRUCTIONS = 1024,
HASH_BUCKET_SIZE = 8,
HASH_STASH_SIZE = 4,
HLL_BITS = 14,
HLL_DENSE_SIZE = 1 << HLL_BITS,
HLL_DENSE_THRESHOLD = HLL_DENSE_SIZE / 4,
// 16-byte, 8-byte, 4-byte, 2-byte, 1-byte
NUM_DIM_WIDTH = 5,
};
// All aggregate functions supported in time series aggregate queries.
enum AggregateFunction {
AGGR_SUM_UNSIGNED = 1,
AGGR_SUM_SIGNED = 2,
AGGR_SUM_FLOAT = 3,
AGGR_MIN_UNSIGNED = 4,
AGGR_MIN_SIGNED = 5,
AGGR_MIN_FLOAT = 6,
AGGR_MAX_UNSIGNED = 7,
AGGR_MAX_SIGNED = 8,
AGGR_MAX_FLOAT = 9,
AGGR_HLL = 10,
AGGR_AVG_FLOAT = 11,
};
// All supported data type to covert golang vector to iterator.
enum DataType {
Bool,
Int8,
Uint8,
Int16,
Uint16,
Int32,
Uint32,
Float32,
Int64,
Uint64,
Float64,
GeoPoint,
UUID,
};
// All supported constant data types.
enum ConstDataType {
ConstInt,
ConstFloat,
ConstGeoPoint,
ConstUUID,
};
// All supported unary functor types.
enum UnaryFunctorType {
Negate,
Not,
BitwiseNot,
IsNull,
IsNotNull,
Noop,
GetWeekStart,
GetMonthStart,
GetQuarterStart,
GetYearStart,
GetDayOfMonth,
GetDayOfYear,
GetMonthOfYear,
GetQuarterOfYear,
GetHLLValue,
ArrayLength,
};
// All supported binary functor types.
enum BinaryFunctorType {
And,
Or,
Equal,
NotEqual,
LessThan,
LessThanOrEqual,
GreaterThan,
GreaterThanOrEqual,
Plus,
Minus,
Multiply,
Divide,
Mod,
BitwiseAnd,
BitwiseOr,
BitwiseXor,
Floor,
ArrayContains,
ArrayElementAt,
};
// RecordID
typedef struct {
int32_t batchID;
uint32_t index;
} RecordID;
// HashIndex stores the HashIndex
// For now we only support dimension table equal join
// the hash index will not support event time
typedef struct {
uint8_t *buckets;
uint32_t seeds[4];
int keyBytes;
int numHashes;
int numBuckets;
} CuckooHashIndex;
// GeoPointT is the struct to represent a single geography point.
typedef struct {
float Lat;
float Long;
} GeoPointT;
// UUIDT is the struct to represent a 16-bytes UUID.
typedef struct {
uint64_t p1;
uint64_t p2;
} UUIDT;
typedef struct {
bool HasDefault;
union {
bool BoolVal;
int32_t Int32Val;
uint32_t Uint32Val;
float FloatVal;
int64_t Int64Val;
GeoPointT GeoPointVal;
UUIDT UUIDVal;
} Value;
} DefaultValue;
// VectorPartySlice stores the slice of each vector relevant to the query.
// It should be supplied for leaf nodes of the AST tree.
typedef struct {
// Pointer points to memory allocated for this vp slice. We store counts,
// nulls and values vector consecutively so that we can represent all
// 3 pointers in the way like basePtr + 2 offsets.
// If it's mode 0 vector, the BasePtr will be null.
// If count vector does not present, the NullsOffset will be zero.
// If null vector is not present, the ValuesOffset will be zero.
uint8_t *BasePtr;
uint32_t NullsOffset;
uint32_t ValuesOffset;
// Because of slicing and alignment, StartingIndex is not always 0.
// StartingIndex will range from 0 to 7, with non-zero values only
// used for bit-packed boolean values and nulls.
uint8_t StartingIndex;
// This is for converting the underlying pointer to appropriate pointer
// type.
enum DataType DataType;
DefaultValue DefaultValue;
uint32_t Length;
} VectorPartySlice;
// ScratchSpaceVector is the output vector for non-leaf non-root nodes and
// input vector for non-leaf nodes that have at least one non-leaf child.
typedef struct {
uint8_t *Values;
uint32_t NullsOffset;
enum DataType DataType;
} ScratchSpaceVector;
// ConstantVector is the constant value in AST tree.
typedef struct {
// Value from the AST tree.
union {
int32_t IntVal;
float FloatVal;
GeoPointT GeoPointVal;
UUIDT UUIDVal;
} Value;
// Whether this values is valid.
bool IsValid;
// A constant indicate const type can be used.
enum ConstDataType DataType;
} ConstantVector;
// ForeignColumnVector stores all batches of vectors for the target column
// and the record ids from hash lookup.
// Note: foreign vector are only from
// dimension tables and are unsorted columns from live batches.
typedef struct {
RecordID *RecordIDs;
VectorPartySlice *Batches;
int32_t BaseBatchID;
int32_t NumBatches;
int32_t NumRecordsInLastBatch;
int16_t *const TimezoneLookup;
int16_t TimezoneLookupSize;
enum DataType DataType;
DefaultValue DefaultValue;
} ForeignColumnVector;
// ArrayVectorPartySlice stores the slice of Array column
typedef struct {
// OffsetLength pointer point to memory of offset-length vector
uint8_t *OffsetLengthVector;
// value offset adjustment
uint32_t ValueOffsetAdj;
enum DataType DataType;
uint32_t Length;
} ArrayVectorPartySlice;
// All supported input vector type.
enum InputVectorType {
VectorPartyInput,
ScratchSpaceInput,
ConstantInput,
ForeignColumnInput,
ArrayVectorPartyInput
};
// InputVector is the vector used as input to transform and filter. Actual
// type of input is decided by the type field.
typedef struct {
union {
ConstantVector Constant;
VectorPartySlice VP;
ScratchSpaceVector ScratchSpace;
ForeignColumnVector ForeignVP;
ArrayVectorPartySlice ArrayVP;
} Vector;
enum InputVectorType Type;
} InputVector;
// DimensionVector stores the dimension vector byte array.
// DimensionVector will be allocated together.
// The layout will be 4byte vectors followed by 2 byte vectors followed by 1
// byte vectors and null vector (1 byte) Note: IndexVector is the indexVector of
// dimension vector, not the index vector of the batch Sort and reduce will be
// first done on IndexVector and than permutated to the output dimension vectors
typedef struct {
uint8_t *DimValues;
uint64_t *HashValues;
uint32_t *IndexVector;
int VectorCapacity;
uint8_t NumDimsPerDimWidth[NUM_DIM_WIDTH];
} DimensionVector;
// DimensionOutputVector is used as the output vector of dimension
// transformation for each dimension.
typedef struct {
uint8_t *DimValues;
uint8_t *DimNulls;
enum DataType DataType;
} DimensionOutputVector;
// MeasureOutputVector is used as output vector of measure transformation.
// Right now we have one measure per query but we may have multiple measures
// in future.
typedef struct {
// Where to write the values
uint32_t *Values;
enum DataType DataType;
enum AggregateFunction AggFunc;
} MeasureOutputVector;
// All supported output vector type.
enum OutputVectorType {
ScratchSpaceOutput,
MeasureOutput,
DimensionOutput,
};
// OutputVector is the vector used as output of transform. Actual
// type of input is decided by the type field.
typedef struct {
union {
ScratchSpaceVector ScratchSpace;
DimensionOutputVector Dimension;
MeasureOutputVector Measure;
} Vector;
enum OutputVectorType Type;
} OutputVector;
/*
// Batch stores all columns of the batch relevant to the query.
//
// Columns from different batches of the same table share the same order:
// 1. Columns not from ArchivingSortColumns.
// 2. Columns from ArchivingSortColumns in reverse order.
typedef struct {
VectorPartySlice Columns[MAX_COLUMNS_OF_A_TABLE];
} Batch;
// ForeignTable stores the data and join conditions of a table being joined.
// A hash index on the equi-join columns is either maintained or created at
// query time before the query executes.
typedef struct {
// Input data:
Batch *batches;
int32_t BaseBatchID;
int32_t NumBatches;
// Column metadata:
int32_t NumColumns;
uint16_t ValueBitsByColumn[MAX_COLUMNS_OF_A_TABLE];
// TODO(shengyue): hash index
CuckooHashIndex HashIndex;
// Equi-join conditions:
int32_t NumberOfEqualityPairs;
// Index of each column in batches[x].Columns used in the equi-join
// condition.
uint8_t LocalColumns[MAX_COLUMNS_OF_A_TABLE];
// Index of each remote column in the global column array used in the
// equi-join condition. All columns from the main table are added to the
// global column array, followed by all columns from the first foreign
// table, followed by all columns from the second foreign table..
// All remote columns must be present on the global column array by the time
// this join happens; the values of which are used as the hash key to locate
// the matching record on this table.
uint8_t RemoteColumns[MAX_COLUMNS_OF_A_TABLE];
} ForeignTable;
// TimeSeriesAggregate defines the query, stores the input data from
// foreign tables, and manages output buffers. Input data from the main table
// are passed to Cuda separately to allow batch-based pipelining while reusing
// the same query instance across multiple batches.
typedef struct {
// Foreign tables with input data and hash index:
ForeignTable ForeignTables[MAX_FOREIGN_TABLES];
int32_t NumForeignTables;
// Column metadata for the main table:
int32_t NumColumns;
uint16_t ValueBitsByColumn[MAX_COLUMNS_OF_A_TABLE];
// Query definition:
int32_t NumDimensions;
int32_t NumMeasures;
// FilterInstsForUnsorted/ArchiveBatches produces 0 or 1 on top of the value
// stack to indicate whether the record should be skipped or included for
// all measures.
uint32_t FilterInstsForLiveBatches[MAX_INSTRUCTIONS];
uint32_t FilterInstsForArchiveBatches[MAX_INSTRUCTIONS];
// DimMeasureInsts produces one uint32_t word for each dimension, followed
// by one uint32_t word for each measure on top of the value stack.
// It also produces one char for each dimension and measure on top of the
// null stack to indicate whether they are valid (1) or null (0). A null
// measure skips its aggregation. A null dimension is reported as null.
uint32_t DimMeasureInsts[MAX_INSTRUCTIONS];
// Actual word width (in bytes) of each dimension.
uint8_t DimensionBytes[MAX_DIMENSIONS];
// Total number of bytes of all dimensions.
uint32_t TotalDimensionBytes;
// Aggreate functions for each measure.
enum AggregateFunction Aggregates[MAX_MEASURES];
} TimeSeriesAggregate;
*/
// GeoShape is the struct to represent a geofence shape. A single geoshape can
// consists of multiple polygons including holes inside the polygons. Lats and
// Longs vector stores the latitude and longitude values of each point of those
// polygons.
typedef struct {
// Lats and Longs are stored in the format as
// [a1,a2,...an,a1,FLT_MAX,b1,bz,...bn]
// where FLT_MAX denotes the beginning of next polygon.
// We repeat the first vertex at end of each polygon, so the last line is
// an-a1.
float *Lats;
float *Longs;
// Number of coordinates including FLT_MAX placeholders in the vector.
uint16_t NumPoints;
} GeoShape;
// GeoShapeBatch
typedef struct {
// last
uint8_t *LatLongs;
// 1. first one byte stores total number of words(uint32_t) needed to
// store predicate value (in or out of shape) of each shape per point
// each shape will take 1 bit so every 32 shapes will take 1 word
// 2. next three bytes stores the total number of points
int32_t TotalNumPoints;
uint8_t TotalWords;
} GeoShapeBatch;
// unaryTransform defines the C transform interface for golang to call.
#ifdef __cplusplus
extern "C" {
#endif
// InitIndexVector initialize index vector
CGoCallResHandle InitIndexVector(uint32_t *indexVector,
uint32_t start,
int indexVectorLength,
void *cudaStream,
int device);
// HashLookup looks up input values and stores results (record ids) into record
// id vector
CGoCallResHandle HashLookup(InputVector input,
RecordID *output,
uint32_t *indexVector,
int indexVectorLength,
uint32_t *baseCounts,
uint32_t startCount,
CuckooHashIndex hashIndex,
void *cudaStream,
int device);
// UnaryTransform transforms an InputVector to output
// OutputVector by applying UnaryFunctor to each of the element.
// Output space should be preallocated by caller. Notice unaryTransform
// should not accept a constant, this should be ensured by the query
// compiler.
CGoCallResHandle UnaryTransform(InputVector input,
OutputVector output,
// Will only be used at the leaf node and measure output.
uint32_t *indexVector,
int indexVectorLength,
// Will only be used at the leaf node and measure output.
uint32_t *baseCounts,
uint32_t startCount,
enum UnaryFunctorType functorType,
void *cudaStream,
int device);
// UnaryFilter filters the index vector by applying the unary functor
// on input and uses the result as the filter predicate. It returns the
// index size after filtering. Caller is responsible for allocating
// and initializing the index vector. Filter will be in-place so
// no extra storage will be used.
CGoCallResHandle UnaryFilter(InputVector input,
uint32_t *indexVector,
uint8_t *predicateVector,
int indexVectorLength,
// Will only be used for foreign table column input
RecordID **recordIDVectors,
int numForeignTables,
// Will only be used if this is a leaf node.
uint32_t *baseCounts,
uint32_t startCount,
enum UnaryFunctorType functorType,
void *cudaStream,
int device);
// BinaryTransform applies BinaryFunctor on each pair of the elements in
// lhs and rhs and writes the output to OutputVector. Output space should
// be preallocated by caller. LHS and RHS cannot both be constant.
CGoCallResHandle BinaryTransform(InputVector lhs,
InputVector rhs,
OutputVector output,
// Will only be used at the leaf node and measure output.
uint32_t *indexVector,
int indexVectorLength,
// Will only be used at the leaf node and measure output.
uint32_t *baseCounts,
uint32_t startCount,
enum BinaryFunctorType functorType,
void *cudaStream,
int device);
// BinaryFilter is the binary version of unaryFilter.
CGoCallResHandle BinaryFilter(InputVector lhs,
InputVector rhs,
uint32_t *indexVector,
uint8_t *predicateVector,
int indexVectorLength,
// Will only be used for foreign table column input
RecordID **recordIDVectors,
int numForeignTables,
// Will only be used if this is a leaf node.
uint32_t *baseCounts,
uint32_t startCount,
enum BinaryFunctorType functorType,
void *cudaStream,
int device);
// Sort performs a key-value sort to sort elements in keys and values in
// ascending key order. Notice we don't need data type of the measure
// values here since we only need to move the whole 4 bytes around
// without any change.
CGoCallResHandle Sort(DimensionVector keys,
int length,
void *cudaStream,
int device);
// Reduce reduces inputValues on inputKeys using the AggregateFunction and
// write the unique keys to outputKeys and aggregation results to outputValues.
// It returns number of unique keys as result also. Notice outputKeys and
// outputValues should be preallocated by caller.
CGoCallResHandle Reduce(DimensionVector inputKeys,
uint8_t *inputValues,
DimensionVector outputKeys,
uint8_t *outputValues,
int valueBytes,
int length,
enum AggregateFunction aggFunc,
void *cudaStream,
int device);
// HashReduce does the reduction using hash instead of sort and then reduce.
// Note for HashReduce we will not use hash vector for keys so since
// hash happens during hash map insertion, therefore we can skip allocation for
// hash vector. We will not need index vector as well since we no longer sort
// the rows.
CGoCallResHandle HashReduce(DimensionVector inputKeys,
uint8_t *inputValues,
DimensionVector outputKeys,
uint8_t *outputValues,
int valueBytes,
int length,
enum AggregateFunction aggFunc,
void *cudaStream,
int device);
// Expand function is used to decompress the dimensions which are compressed
// through baseCounts, and append to existing outputKeys.
// @inputKeys input DimensionVector
// @outputKeys output DimensionVector
// @baseCounts count vector for first column
// @indexVector index vector of the dimension keys
// @indexVectorLen length of index vector will be used for the output, it should
// be less or equal to length of keys in inputKeys
// outputOccupiedLen number of rows already in the outputKeys, as this will
// be append operation
CGoCallResHandle Expand(DimensionVector inputKeys,
DimensionVector outputKeys,
uint32_t *baseCounts,
uint32_t *indexVector,
int indexVectorLen,
int outputOccupiedLen,
void *cudaStream,
int device);
// HyperLogLog is the interface to do hyperloglog in one cgo function call.
// prevResultSize is to tell start position of keys and values of current batch.
// hllVector (dense/sparse) will only be set when we allocate it for the last
// batch. If it's the last batch. this function will return number of different
// dim value combinations. Otherwise it returns the (previous result size +
// number of unique hash values in current batch),
// hllVectorSizePtr stores the size of hllVector
// hllDimRegIDCount stores the num of reg_id per dim
CGoCallResHandle HyperLogLog(DimensionVector prevDimOut,
DimensionVector curDimOut,
uint32_t *prevValuesOut,
uint32_t *curValuesOut,
int prevResultSize,
int curBatchSize,
bool isLastBatch,
uint8_t **hllVectorPtr,
size_t *hllVectorSizePtr,
uint16_t **hllDimRegIDCountPtr,
void *cudaStream,
int device);
// GeoBatchIntersects is the interface to do geography intersections for all
// geoshapes altogether. inOrOut represents whether we want to check geo points
// are in the shape or not, 1 represents in and 0 represents not in.
// outputPredicate will be used for filter.
// Returns the filtered size otherwise returns the original size.
CGoCallResHandle GeoBatchIntersects(
GeoShapeBatch geoShapeBatch, InputVector points, uint32_t *indexVector,
int indexVectorLength, uint32_t startCount, RecordID **recordIDVectors,
int numForeignTables, uint32_t *outputPredicate, bool inOrOut,
void *cudaStream, int device);
// WriteGeoShapeDim is the interface to write the shape index to the dimension
// output. This method should be called after GeoBatchIntersects removes all
// non-intersecting rows. Note we need pass in the index vector length before
// geo since outputPredicate has not done compaction yet.
// Note:
// 1. we assume that the geo join will be many-to-one join
// 2. we only support IN operation for geo intersects join
CGoCallResHandle WriteGeoShapeDim(
int shapeTotalWords, DimensionOutputVector dimOut,
int indexVectorLengthBeforeGeo, uint32_t *outputPredicate,
void *cudaStream, int device);
// BoostrapDevice will bootstrap the all gpu devices with approriate actions.
// For now it will just initialize the constant memory.
CGoCallResHandle BootstrapDevice();
#ifdef __cplusplus
}
#endif
#endif // QUERY_TIME_SERIES_AGGREGATE_H_