// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef QUERY_TIME_SERIES_AGGREGATE_H_ #define QUERY_TIME_SERIES_AGGREGATE_H_ #include #include #include #include "../cgoutils/utils.h" // some macro used for host or device compilation #ifdef RUN_ON_DEVICE #define SET_DEVICE(device) cudaSetDevice(device) #else #define SET_DEVICE(device) #endif // These C-style array limits are used to make the query structure as flat as // reasonably possible. enum { MAX_FOREIGN_TABLES = 7, MAX_COLUMNS_OF_A_TABLE = 32, MAX_DIMENSIONS = 8, MAX_DIMENSION_BYTES = 32, MAX_MEASURES = 32, MAX_INSTRUCTIONS = 1024, HASH_BUCKET_SIZE = 8, HASH_STASH_SIZE = 4, HLL_BITS = 14, HLL_DENSE_SIZE = 1 << HLL_BITS, HLL_DENSE_THRESHOLD = HLL_DENSE_SIZE / 4, // 16-byte, 8-byte, 4-byte, 2-byte, 1-byte NUM_DIM_WIDTH = 5, }; // All aggregate functions supported in time series aggregate queries. enum AggregateFunction { AGGR_SUM_UNSIGNED = 1, AGGR_SUM_SIGNED = 2, AGGR_SUM_FLOAT = 3, AGGR_MIN_UNSIGNED = 4, AGGR_MIN_SIGNED = 5, AGGR_MIN_FLOAT = 6, AGGR_MAX_UNSIGNED = 7, AGGR_MAX_SIGNED = 8, AGGR_MAX_FLOAT = 9, AGGR_HLL = 10, AGGR_AVG_FLOAT = 11, }; // All supported data type to covert golang vector to iterator. enum DataType { Bool, Int8, Uint8, Int16, Uint16, Int32, Uint32, Float32, Int64, Uint64, Float64, GeoPoint, UUID, }; // All supported constant data types. enum ConstDataType { ConstInt, ConstFloat, ConstGeoPoint, ConstUUID, }; // All supported unary functor types. enum UnaryFunctorType { Negate, Not, BitwiseNot, IsNull, IsNotNull, Noop, GetWeekStart, GetMonthStart, GetQuarterStart, GetYearStart, GetDayOfMonth, GetDayOfYear, GetMonthOfYear, GetQuarterOfYear, GetHLLValue, ArrayLength, }; // All supported binary functor types. enum BinaryFunctorType { And, Or, Equal, NotEqual, LessThan, LessThanOrEqual, GreaterThan, GreaterThanOrEqual, Plus, Minus, Multiply, Divide, Mod, BitwiseAnd, BitwiseOr, BitwiseXor, Floor, ArrayContains, ArrayElementAt, }; // RecordID typedef struct { int32_t batchID; uint32_t index; } RecordID; // HashIndex stores the HashIndex // For now we only support dimension table equal join // the hash index will not support event time typedef struct { uint8_t *buckets; uint32_t seeds[4]; int keyBytes; int numHashes; int numBuckets; } CuckooHashIndex; // GeoPointT is the struct to represent a single geography point. typedef struct { float Lat; float Long; } GeoPointT; // UUIDT is the struct to represent a 16-bytes UUID. typedef struct { uint64_t p1; uint64_t p2; } UUIDT; typedef struct { bool HasDefault; union { bool BoolVal; int32_t Int32Val; uint32_t Uint32Val; float FloatVal; int64_t Int64Val; GeoPointT GeoPointVal; UUIDT UUIDVal; } Value; } DefaultValue; // VectorPartySlice stores the slice of each vector relevant to the query. // It should be supplied for leaf nodes of the AST tree. typedef struct { // Pointer points to memory allocated for this vp slice. We store counts, // nulls and values vector consecutively so that we can represent all // 3 pointers in the way like basePtr + 2 offsets. // If it's mode 0 vector, the BasePtr will be null. // If count vector does not present, the NullsOffset will be zero. // If null vector is not present, the ValuesOffset will be zero. uint8_t *BasePtr; uint32_t NullsOffset; uint32_t ValuesOffset; // Because of slicing and alignment, StartingIndex is not always 0. // StartingIndex will range from 0 to 7, with non-zero values only // used for bit-packed boolean values and nulls. uint8_t StartingIndex; // This is for converting the underlying pointer to appropriate pointer // type. enum DataType DataType; DefaultValue DefaultValue; uint32_t Length; } VectorPartySlice; // ScratchSpaceVector is the output vector for non-leaf non-root nodes and // input vector for non-leaf nodes that have at least one non-leaf child. typedef struct { uint8_t *Values; uint32_t NullsOffset; enum DataType DataType; } ScratchSpaceVector; // ConstantVector is the constant value in AST tree. typedef struct { // Value from the AST tree. union { int32_t IntVal; float FloatVal; GeoPointT GeoPointVal; UUIDT UUIDVal; } Value; // Whether this values is valid. bool IsValid; // A constant indicate const type can be used. enum ConstDataType DataType; } ConstantVector; // ForeignColumnVector stores all batches of vectors for the target column // and the record ids from hash lookup. // Note: foreign vector are only from // dimension tables and are unsorted columns from live batches. typedef struct { RecordID *RecordIDs; VectorPartySlice *Batches; int32_t BaseBatchID; int32_t NumBatches; int32_t NumRecordsInLastBatch; int16_t *const TimezoneLookup; int16_t TimezoneLookupSize; enum DataType DataType; DefaultValue DefaultValue; } ForeignColumnVector; // ArrayVectorPartySlice stores the slice of Array column typedef struct { // OffsetLength pointer point to memory of offset-length vector uint8_t *OffsetLengthVector; // value offset adjustment uint32_t ValueOffsetAdj; enum DataType DataType; uint32_t Length; } ArrayVectorPartySlice; // All supported input vector type. enum InputVectorType { VectorPartyInput, ScratchSpaceInput, ConstantInput, ForeignColumnInput, ArrayVectorPartyInput }; // InputVector is the vector used as input to transform and filter. Actual // type of input is decided by the type field. typedef struct { union { ConstantVector Constant; VectorPartySlice VP; ScratchSpaceVector ScratchSpace; ForeignColumnVector ForeignVP; ArrayVectorPartySlice ArrayVP; } Vector; enum InputVectorType Type; } InputVector; // DimensionVector stores the dimension vector byte array. // DimensionVector will be allocated together. // The layout will be 4byte vectors followed by 2 byte vectors followed by 1 // byte vectors and null vector (1 byte) Note: IndexVector is the indexVector of // dimension vector, not the index vector of the batch Sort and reduce will be // first done on IndexVector and than permutated to the output dimension vectors typedef struct { uint8_t *DimValues; uint64_t *HashValues; uint32_t *IndexVector; int VectorCapacity; uint8_t NumDimsPerDimWidth[NUM_DIM_WIDTH]; } DimensionVector; // DimensionOutputVector is used as the output vector of dimension // transformation for each dimension. typedef struct { uint8_t *DimValues; uint8_t *DimNulls; enum DataType DataType; } DimensionOutputVector; // MeasureOutputVector is used as output vector of measure transformation. // Right now we have one measure per query but we may have multiple measures // in future. typedef struct { // Where to write the values uint32_t *Values; enum DataType DataType; enum AggregateFunction AggFunc; } MeasureOutputVector; // All supported output vector type. enum OutputVectorType { ScratchSpaceOutput, MeasureOutput, DimensionOutput, }; // OutputVector is the vector used as output of transform. Actual // type of input is decided by the type field. typedef struct { union { ScratchSpaceVector ScratchSpace; DimensionOutputVector Dimension; MeasureOutputVector Measure; } Vector; enum OutputVectorType Type; } OutputVector; /* // Batch stores all columns of the batch relevant to the query. // // Columns from different batches of the same table share the same order: // 1. Columns not from ArchivingSortColumns. // 2. Columns from ArchivingSortColumns in reverse order. typedef struct { VectorPartySlice Columns[MAX_COLUMNS_OF_A_TABLE]; } Batch; // ForeignTable stores the data and join conditions of a table being joined. // A hash index on the equi-join columns is either maintained or created at // query time before the query executes. typedef struct { // Input data: Batch *batches; int32_t BaseBatchID; int32_t NumBatches; // Column metadata: int32_t NumColumns; uint16_t ValueBitsByColumn[MAX_COLUMNS_OF_A_TABLE]; // TODO(shengyue): hash index CuckooHashIndex HashIndex; // Equi-join conditions: int32_t NumberOfEqualityPairs; // Index of each column in batches[x].Columns used in the equi-join // condition. uint8_t LocalColumns[MAX_COLUMNS_OF_A_TABLE]; // Index of each remote column in the global column array used in the // equi-join condition. All columns from the main table are added to the // global column array, followed by all columns from the first foreign // table, followed by all columns from the second foreign table.. // All remote columns must be present on the global column array by the time // this join happens; the values of which are used as the hash key to locate // the matching record on this table. uint8_t RemoteColumns[MAX_COLUMNS_OF_A_TABLE]; } ForeignTable; // TimeSeriesAggregate defines the query, stores the input data from // foreign tables, and manages output buffers. Input data from the main table // are passed to Cuda separately to allow batch-based pipelining while reusing // the same query instance across multiple batches. typedef struct { // Foreign tables with input data and hash index: ForeignTable ForeignTables[MAX_FOREIGN_TABLES]; int32_t NumForeignTables; // Column metadata for the main table: int32_t NumColumns; uint16_t ValueBitsByColumn[MAX_COLUMNS_OF_A_TABLE]; // Query definition: int32_t NumDimensions; int32_t NumMeasures; // FilterInstsForUnsorted/ArchiveBatches produces 0 or 1 on top of the value // stack to indicate whether the record should be skipped or included for // all measures. uint32_t FilterInstsForLiveBatches[MAX_INSTRUCTIONS]; uint32_t FilterInstsForArchiveBatches[MAX_INSTRUCTIONS]; // DimMeasureInsts produces one uint32_t word for each dimension, followed // by one uint32_t word for each measure on top of the value stack. // It also produces one char for each dimension and measure on top of the // null stack to indicate whether they are valid (1) or null (0). A null // measure skips its aggregation. A null dimension is reported as null. uint32_t DimMeasureInsts[MAX_INSTRUCTIONS]; // Actual word width (in bytes) of each dimension. uint8_t DimensionBytes[MAX_DIMENSIONS]; // Total number of bytes of all dimensions. uint32_t TotalDimensionBytes; // Aggreate functions for each measure. enum AggregateFunction Aggregates[MAX_MEASURES]; } TimeSeriesAggregate; */ // GeoShape is the struct to represent a geofence shape. A single geoshape can // consists of multiple polygons including holes inside the polygons. Lats and // Longs vector stores the latitude and longitude values of each point of those // polygons. typedef struct { // Lats and Longs are stored in the format as // [a1,a2,...an,a1,FLT_MAX,b1,bz,...bn] // where FLT_MAX denotes the beginning of next polygon. // We repeat the first vertex at end of each polygon, so the last line is // an-a1. float *Lats; float *Longs; // Number of coordinates including FLT_MAX placeholders in the vector. uint16_t NumPoints; } GeoShape; // GeoShapeBatch typedef struct { // last uint8_t *LatLongs; // 1. first one byte stores total number of words(uint32_t) needed to // store predicate value (in or out of shape) of each shape per point // each shape will take 1 bit so every 32 shapes will take 1 word // 2. next three bytes stores the total number of points int32_t TotalNumPoints; uint8_t TotalWords; } GeoShapeBatch; // unaryTransform defines the C transform interface for golang to call. #ifdef __cplusplus extern "C" { #endif // InitIndexVector initialize index vector CGoCallResHandle InitIndexVector(uint32_t *indexVector, uint32_t start, int indexVectorLength, void *cudaStream, int device); // HashLookup looks up input values and stores results (record ids) into record // id vector CGoCallResHandle HashLookup(InputVector input, RecordID *output, uint32_t *indexVector, int indexVectorLength, uint32_t *baseCounts, uint32_t startCount, CuckooHashIndex hashIndex, void *cudaStream, int device); // UnaryTransform transforms an InputVector to output // OutputVector by applying UnaryFunctor to each of the element. // Output space should be preallocated by caller. Notice unaryTransform // should not accept a constant, this should be ensured by the query // compiler. CGoCallResHandle UnaryTransform(InputVector input, OutputVector output, // Will only be used at the leaf node and measure output. uint32_t *indexVector, int indexVectorLength, // Will only be used at the leaf node and measure output. uint32_t *baseCounts, uint32_t startCount, enum UnaryFunctorType functorType, void *cudaStream, int device); // UnaryFilter filters the index vector by applying the unary functor // on input and uses the result as the filter predicate. It returns the // index size after filtering. Caller is responsible for allocating // and initializing the index vector. Filter will be in-place so // no extra storage will be used. CGoCallResHandle UnaryFilter(InputVector input, uint32_t *indexVector, uint8_t *predicateVector, int indexVectorLength, // Will only be used for foreign table column input RecordID **recordIDVectors, int numForeignTables, // Will only be used if this is a leaf node. uint32_t *baseCounts, uint32_t startCount, enum UnaryFunctorType functorType, void *cudaStream, int device); // BinaryTransform applies BinaryFunctor on each pair of the elements in // lhs and rhs and writes the output to OutputVector. Output space should // be preallocated by caller. LHS and RHS cannot both be constant. CGoCallResHandle BinaryTransform(InputVector lhs, InputVector rhs, OutputVector output, // Will only be used at the leaf node and measure output. uint32_t *indexVector, int indexVectorLength, // Will only be used at the leaf node and measure output. uint32_t *baseCounts, uint32_t startCount, enum BinaryFunctorType functorType, void *cudaStream, int device); // BinaryFilter is the binary version of unaryFilter. CGoCallResHandle BinaryFilter(InputVector lhs, InputVector rhs, uint32_t *indexVector, uint8_t *predicateVector, int indexVectorLength, // Will only be used for foreign table column input RecordID **recordIDVectors, int numForeignTables, // Will only be used if this is a leaf node. uint32_t *baseCounts, uint32_t startCount, enum BinaryFunctorType functorType, void *cudaStream, int device); // Sort performs a key-value sort to sort elements in keys and values in // ascending key order. Notice we don't need data type of the measure // values here since we only need to move the whole 4 bytes around // without any change. CGoCallResHandle Sort(DimensionVector keys, int length, void *cudaStream, int device); // Reduce reduces inputValues on inputKeys using the AggregateFunction and // write the unique keys to outputKeys and aggregation results to outputValues. // It returns number of unique keys as result also. Notice outputKeys and // outputValues should be preallocated by caller. CGoCallResHandle Reduce(DimensionVector inputKeys, uint8_t *inputValues, DimensionVector outputKeys, uint8_t *outputValues, int valueBytes, int length, enum AggregateFunction aggFunc, void *cudaStream, int device); // HashReduce does the reduction using hash instead of sort and then reduce. // Note for HashReduce we will not use hash vector for keys so since // hash happens during hash map insertion, therefore we can skip allocation for // hash vector. We will not need index vector as well since we no longer sort // the rows. CGoCallResHandle HashReduce(DimensionVector inputKeys, uint8_t *inputValues, DimensionVector outputKeys, uint8_t *outputValues, int valueBytes, int length, enum AggregateFunction aggFunc, void *cudaStream, int device); // Expand function is used to decompress the dimensions which are compressed // through baseCounts, and append to existing outputKeys. // @inputKeys input DimensionVector // @outputKeys output DimensionVector // @baseCounts count vector for first column // @indexVector index vector of the dimension keys // @indexVectorLen length of index vector will be used for the output, it should // be less or equal to length of keys in inputKeys // outputOccupiedLen number of rows already in the outputKeys, as this will // be append operation CGoCallResHandle Expand(DimensionVector inputKeys, DimensionVector outputKeys, uint32_t *baseCounts, uint32_t *indexVector, int indexVectorLen, int outputOccupiedLen, void *cudaStream, int device); // HyperLogLog is the interface to do hyperloglog in one cgo function call. // prevResultSize is to tell start position of keys and values of current batch. // hllVector (dense/sparse) will only be set when we allocate it for the last // batch. If it's the last batch. this function will return number of different // dim value combinations. Otherwise it returns the (previous result size + // number of unique hash values in current batch), // hllVectorSizePtr stores the size of hllVector // hllDimRegIDCount stores the num of reg_id per dim CGoCallResHandle HyperLogLog(DimensionVector prevDimOut, DimensionVector curDimOut, uint32_t *prevValuesOut, uint32_t *curValuesOut, int prevResultSize, int curBatchSize, bool isLastBatch, uint8_t **hllVectorPtr, size_t *hllVectorSizePtr, uint16_t **hllDimRegIDCountPtr, void *cudaStream, int device); // GeoBatchIntersects is the interface to do geography intersections for all // geoshapes altogether. inOrOut represents whether we want to check geo points // are in the shape or not, 1 represents in and 0 represents not in. // outputPredicate will be used for filter. // Returns the filtered size otherwise returns the original size. CGoCallResHandle GeoBatchIntersects( GeoShapeBatch geoShapeBatch, InputVector points, uint32_t *indexVector, int indexVectorLength, uint32_t startCount, RecordID **recordIDVectors, int numForeignTables, uint32_t *outputPredicate, bool inOrOut, void *cudaStream, int device); // WriteGeoShapeDim is the interface to write the shape index to the dimension // output. This method should be called after GeoBatchIntersects removes all // non-intersecting rows. Note we need pass in the index vector length before // geo since outputPredicate has not done compaction yet. // Note: // 1. we assume that the geo join will be many-to-one join // 2. we only support IN operation for geo intersects join CGoCallResHandle WriteGeoShapeDim( int shapeTotalWords, DimensionOutputVector dimOut, int indexVectorLengthBeforeGeo, uint32_t *outputPredicate, void *cudaStream, int device); // BoostrapDevice will bootstrap the all gpu devices with approriate actions. // For now it will just initialize the constant memory. CGoCallResHandle BootstrapDevice(); #ifdef __cplusplus } #endif #endif // QUERY_TIME_SERIES_AGGREGATE_H_