memstore/merge.go (405 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package memstore import ( "github.com/uber/aresdb/memstore/common" "github.com/uber/aresdb/memstore/list" "github.com/uber/aresdb/memstore/vectors" "github.com/uber/aresdb/utils" "sync" ) // mergeContext carries all context information used during merge type mergeContext struct { base *ArchiveBatch patch *archivingPatch merged *ArchiveBatch // Number of total columns (including deleted columns). numColumns int // Number of records in base patch. If base is nil, size is 0. baseSize int // Number of records in base patch and patch. totalSize int // Iterators for sorted base. One iterator per sort column. baseIters []sortedColumnIterator // Iterators for archiving patch. One iterator per sort column. patchIters []sortedColumnIterator // Following fields will only be used during preallocate stage // length is len(sortColumns). mergedLengths []int // Following fields will be only used during merge stage stores the indexes to // write to the final merged archive batch for each column length is len(all columns). outputBegins []int // It stores the accumulated counts to write to the final merged archive batch for // each sorted column, length is len(all columns). outputCounts []uint32 // Buffer the unsorted column for writing final uncompressed values. unsortedColumns []int // Stores list of rows that have been marked as deleted baseRowDeleted []int // This is used to short circuit merge for deleted columns. columnDeletions []bool // Needed during merging. dataTypes []common.DataType defaultValues []*common.DataValue // Keep track of total unmanaged memory space this merge process uses. unmanagedMemoryBytes int64 } // newMergeContext creates a new context for merge existing batch and archive batch // into a new batch. It's shared by pre-allocate stage and actual merge stage. func newMergeContext(base *ArchiveBatch, patch *archivingPatch, columnDeletions []bool, dataTypes []common.DataType, defaultValues []*common.DataValue, baseRowDeleted []int) *mergeContext { numColumns := len(dataTypes) baseSize := 0 if base != nil { baseSize = base.Size } ctx := &mergeContext{ base: base, patch: patch, numColumns: numColumns, baseSize: baseSize, totalSize: len(patch.recordIDs) + baseSize - len(baseRowDeleted), mergedLengths: make([]int, len(patch.sortColumns)), outputBegins: make([]int, numColumns), outputCounts: make([]uint32, len(patch.sortColumns)), unsortedColumns: make([]int, 0, numColumns-len(patch.sortColumns)), baseRowDeleted: baseRowDeleted, columnDeletions: columnDeletions, dataTypes: dataTypes, defaultValues: defaultValues, } ctx.initIters() return ctx } // initIters initialize patch and base iterators. func (ctx *mergeContext) initIters() { ctx.patchIters = make([]sortedColumnIterator, len(ctx.patch.sortColumns)) ctx.baseIters = make([]sortedColumnIterator, len(ctx.patch.sortColumns)) for i, columnID := range ctx.patch.sortColumns { ctx.patchIters[i] = newArchivingPatchColumnIterator(ctx.patch, columnID) // archive batch iterator ctx.baseIters[i] = newArchiveBatchColumnIterator(ctx.base, columnID, ctx.baseRowDeleted) } } // sortedColumnIterator is the common interface to merge two sorted columns type sortedColumnIterator interface { // Read values from current idx. read() // Advance the iterator. next() // Tells whether the iteration has been finished. done() bool // Returns current data value the iterator points to value() common.DataValue // Tells what's the current index of iterator in a vector index() int // Tells iterator where to start and stop in next sorted column. // For base iterator, it's current count. // For patch iterator, it's current index. // To make the interface consistent, we choose return uint32. // So for patch iterator, we need to convert it between int and uint32. currentPosition() uint32 // For base iterator, it's next count. // For patch iterator, it's next index. nextPosition() uint32 // Count of current value. count() uint32 // For base iterator, it should be next end count. For patch iterator, // it should be next end index. This must be called before iterating // each slice setEndPosition(pos uint32) // list of rows that should be skipped for current value currentSkipRows() []int } type archiveBatchColumnIterator struct { // Column being iterated. vp common.ArchiveVectorParty // Iterator position. idx int // # currentCount uint32 // rows from beginning to current value nextCount uint32 // total rows of this vp endCount uint32 val common.DataValue // rows deleted rowsDeleted []int // rowsDeleted start position for current value currentRowsDeletedStart int // rowsDeleted end position(exclusive) for current value currentRowsDeletedEnd int } // newArchiveBatchColumnIterator creates a new iterator that iterates through // a specific range of archive batch column. func newArchiveBatchColumnIterator(base *ArchiveBatch, columnID int, rowsDeleted []int) sortedColumnIterator { var baseVP common.ArchiveVectorParty if base != nil && columnID < len(base.Columns) { baseVP = base.Columns[columnID].(common.ArchiveVectorParty) } return &archiveBatchColumnIterator{ vp: baseVP, rowsDeleted: rowsDeleted, } } func (itr *archiveBatchColumnIterator) done() bool { if itr.vp == nil { return true } return itr.idx >= itr.vp.GetLength() || itr.currentCount >= itr.endCount } func (itr *archiveBatchColumnIterator) read() { if itr.done() { return } // Sort columns of archive batch should be either mode 3 or mode 0. mode := itr.vp.(common.CVectorParty).GetMode() itr.currentRowsDeletedStart = itr.currentRowsDeletedEnd if mode == common.HasCountVector { itr.nextCount = itr.vp.GetCount(itr.idx) } else { itr.nextCount = itr.endCount } for itr.currentRowsDeletedEnd < len(itr.rowsDeleted) && uint32(itr.rowsDeleted[itr.currentRowsDeletedEnd]) < itr.nextCount { itr.currentRowsDeletedEnd++ } itr.val = itr.vp.GetDataValue(itr.idx) } func (itr *archiveBatchColumnIterator) next() { // move to next value itr.idx++ itr.currentCount = itr.nextCount itr.read() } func (itr *archiveBatchColumnIterator) index() int { return itr.idx } func (itr *archiveBatchColumnIterator) value() common.DataValue { return itr.val } func (itr *archiveBatchColumnIterator) currentPosition() uint32 { return itr.currentCount } func (itr *archiveBatchColumnIterator) nextPosition() uint32 { return itr.nextCount } func (itr *archiveBatchColumnIterator) count() uint32 { return itr.nextCount - itr.currentCount - uint32(itr.currentRowsDeletedEnd-itr.currentRowsDeletedStart) } func (itr *archiveBatchColumnIterator) currentSkipRows() []int { return itr.rowsDeleted[itr.currentRowsDeletedStart:itr.currentRowsDeletedEnd] } func (itr *archiveBatchColumnIterator) setEndPosition(pos uint32) { itr.endCount = pos // see to the first valid value itr.read() } type archivingPatchColumnIterator struct { patch *archivingPatch columnID int endIdx int // Iterator position. idx int nextIdx int val common.DataValue } // newArchivingPatchColumnIterator creates a new iterator that iterates through a specific // range of archive batch column. func newArchivingPatchColumnIterator(patch *archivingPatch, columnID int) sortedColumnIterator { itr := &archivingPatchColumnIterator{ patch: patch, columnID: columnID, } return itr } func (itr *archivingPatchColumnIterator) done() bool { return itr.idx >= itr.endIdx } func (itr *archivingPatchColumnIterator) read() { if itr.done() { return } // Read current value. itr.val = itr.patch.GetDataValue(itr.idx, itr.columnID) // Find next value != current value. for itr.nextIdx++; itr.nextIdx < itr.endIdx; itr.nextIdx++ { val := itr.patch.GetDataValue(itr.nextIdx, itr.columnID) if itr.val.Compare(val) != 0 { break } } } func (itr *archivingPatchColumnIterator) next() { // Jump directly to index of next different value. itr.idx = itr.nextIdx itr.read() } func (itr *archivingPatchColumnIterator) value() common.DataValue { return itr.val } func (itr *archivingPatchColumnIterator) index() int { return itr.idx } func (itr *archivingPatchColumnIterator) currentPosition() uint32 { return uint32(itr.idx) } func (itr *archivingPatchColumnIterator) nextPosition() uint32 { return uint32(itr.nextIdx) } func (itr *archivingPatchColumnIterator) count() uint32 { return uint32(itr.nextIdx - itr.idx) } func (itr *archivingPatchColumnIterator) currentSkipRows() []int { return nil } func (itr *archivingPatchColumnIterator) setEndPosition(pos uint32) { itr.endIdx = int(pos) itr.read() } // merge an live batch with a archive batch and store the merged data into a new archive batch. // It has two stages: // 1. preallocate: attempt to merge two batches but only calculate how much space merged data needs. // 2. merge: based on the calculated size, allocate space and do actual merge. // This algorithm will do merge on sorted columns first and based on the positions of last sorted column, // it will copy non-sorted columns data into final result. // The parameters will be used as cutoff and seqNum for the merged batch. func (ctx *mergeContext) merge(cutoff uint32, seqNum uint32) { // We preallocate space in 1st pass to avoid allocate unnecessary memory. ctx.mergeRecursive(0, uint32(ctx.baseSize), len(ctx.patch.recordIDs), ctx.preAllocate) // Allocate space for merged archive batch. ctx.allocate(cutoff, seqNum) // Reset iterators to begin 2nd pass. ctx.initIters() // Do actual merge and write to output vector party. ctx.mergeRecursive(0, uint32(ctx.baseSize), len(ctx.patch.recordIDs), ctx.writeOutput) // If sort columns is empty, we need to dump all values in batch first and then dump patch values. if len(ctx.patch.sortColumns) == 0 { // Write base. ctx.writeUnsortedColumns(0, ctx.baseSize, ctx.base, ctx.baseRowDeleted) // Write patch. ctx.writeUnsortedColumns(0, len(ctx.patch.recordIDs), ctx.patch, nil) } // Scan through all columns for mode 0 and 1 columns and remove unnecessary vectors. for columnID := 0; columnID < len(ctx.merged.Columns); columnID++ { column := ctx.merged.Columns[columnID] column.(common.ArchiveVectorParty).Prune() } } // allocate space for merged archive batch based on calculated mergedLengths. func (ctx *mergeContext) allocate(cutoff uint32, seqNum uint32) { columns := make([]common.VectorParty, ctx.numColumns) // Need to create batch in advance otherwise vector party's allUsersDone will have nil value. ctx.merged = &ArchiveBatch{ Version: cutoff, SeqNum: seqNum, Size: ctx.totalSize, BatchID: ctx.base.BatchID, Shard: ctx.base.Shard, Batch: common.Batch{RWMutex: &sync.RWMutex{}}, } for columnID := 0; columnID < ctx.numColumns; columnID++ { dataType := ctx.dataTypes[columnID] defaultValue := *ctx.defaultValues[columnID] var bytes int64 if i := utils.IndexOfInt(ctx.patch.sortColumns, columnID); i >= 0 { // Sort columns. bytes = int64(vectors.CalculateVectorPartyBytes(dataType, ctx.mergedLengths[i], true, true)) columns[columnID] = newArchiveVectorParty(ctx.mergedLengths[i], dataType, defaultValue, ctx.merged.RWMutex) columns[columnID].Allocate(true) } else { // Non-sort columns. if common.IsArrayType(dataType) { // array will never appear in sort columns offsetBytes := int64(vectors.CalculateVectorBytes(common.Uint32, ctx.totalSize*2)) valueBytes := ctx.calculateArrayVectorPartyBytes(columnID, dataType) bytes = offsetBytes + valueBytes columns[columnID] = list.NewArchiveVectorParty(ctx.totalSize, dataType, valueBytes, ctx.merged.RWMutex) } else { bytes = int64(vectors.CalculateVectorPartyBytes(dataType, ctx.totalSize, true, false)) columns[columnID] = newArchiveVectorParty(ctx.totalSize, dataType, defaultValue, ctx.merged.RWMutex) } if !ctx.columnDeletions[columnID] { columns[columnID].Allocate(false) } ctx.unsortedColumns = append(ctx.unsortedColumns, columnID) } if !ctx.columnDeletions[columnID] { ctx.base.Shard.HostMemoryManager.ReportUnmanagedSpaceUsageChange(bytes) ctx.unmanagedMemoryBytes += bytes } } ctx.merged.Columns = columns } // calculate value vector bytes needed for Array ArchiveParty func (ctx *mergeContext) calculateArrayVectorPartyBytes(columnID int, dataType common.DataType) int64 { var totalBytes int64 // bytes for patch for i := 0; i < len(ctx.patch.recordIDs); i++ { totalBytes += int64(common.CalculateListElementBytes(dataType, ctx.patch.GetCount(i, columnID))) } // bytes for original archive patch vp := ctx.base.GetVectorParty(columnID) if vp == nil { return totalBytes } if !vp.IsList() { return totalBytes } rowsDeletedIndex := 0 listVP := vp.AsList() for i := 0; i < ctx.baseSize; i++ { if rowsDeletedIndex < len(ctx.baseRowDeleted) && ctx.baseRowDeleted[rowsDeletedIndex] == i { // skip the deleted row rowsDeletedIndex++ continue } totalBytes += int64(common.CalculateListElementBytes(dataType, int(listVP.GetElemCount(i)))) } return totalBytes } // common function signature for both preallocate and merge. type mergeAction func(baseIter, patchIter sortedColumnIterator, sortColIdx, compareRes int, mergedVP common.ArchiveVectorParty) // preAllocate called during first pass. func (ctx *mergeContext) preAllocate(baseIter, patchIter sortedColumnIterator, sortColIdx, compareRes int, mergedVP common.ArchiveVectorParty) { ctx.mergedLengths[sortColIdx]++ } func (ctx *mergeContext) writeUnsortedColumns(start, end int, reader common.BatchReader, skipRows []int) { // Base batch is possible to be nil for a particular day. if reader != nil { for i := start; i < end; i++ { if len(skipRows) > 0 && skipRows[0] == i { skipRows = skipRows[1:] continue } for _, columnID := range ctx.unsortedColumns { // We will skip writing to deleted columns so that it will have all null values. if !ctx.columnDeletions[columnID] { mergedVP := ctx.merged.Columns[columnID] val := reader.GetDataValueWithDefault(int(i), columnID, *ctx.defaultValues[columnID]) mergedVP.SetDataValue(ctx.outputBegins[columnID], val, common.IncrementCount) ctx.outputBegins[columnID]++ } } } } } func (ctx *mergeContext) writeOutput(baseIter, patchIter sortedColumnIterator, sortColIdx, compareRes int, mergedVP common.ArchiveVectorParty) { var val common.DataValue var count uint32 ifWriteUnsortedColumns := sortColIdx == len(ctx.patch.sortColumns)-1 // if patch value is less, we read from patch if compareRes < 0 { val = patchIter.value() count = patchIter.count() ctx.outputCounts[sortColIdx] += count if ifWriteUnsortedColumns { ctx.writeUnsortedColumns(int(patchIter.currentPosition()), int(patchIter.nextPosition()), ctx.patch, nil) } } else if compareRes == 0 { val = baseIter.value() count = patchIter.count() + baseIter.count() ctx.outputCounts[sortColIdx] += count if ifWriteUnsortedColumns { ctx.writeUnsortedColumns(int(baseIter.currentPosition()), int(baseIter.nextPosition()), ctx.base, baseIter.currentSkipRows()) ctx.writeUnsortedColumns(int(patchIter.currentPosition()), int(patchIter.nextPosition()), ctx.patch, nil) } } else { val = baseIter.value() count = baseIter.count() ctx.outputCounts[sortColIdx] += count if ifWriteUnsortedColumns { ctx.writeUnsortedColumns(int(baseIter.currentPosition()), int(baseIter.nextPosition()), ctx.base, baseIter.currentSkipRows()) } } columnID := ctx.patch.sortColumns[sortColIdx] // Set value on the mergedVP. mergedVP.SetDataValue(ctx.outputBegins[columnID], val, common.IncrementCount, count) mergedVP.SetCount(ctx.outputBegins[columnID], ctx.outputCounts[sortColIdx]) ctx.outputBegins[columnID]++ } // mergeRecursive does merge on base and patch iterators on a given sort column. baseEndPos is the end count // for base iter to stop. patchEndPos is the end index for patch iter to stop. Any end pos with 0 value means // for this iterator it's an empty range. func (ctx *mergeContext) mergeRecursive(sortColIdx int, baseEndPos uint32, patchEndPos int, f mergeAction) { if sortColIdx >= len(ctx.patch.sortColumns) { return } columnID := ctx.patch.sortColumns[sortColIdx] var mergedVP common.ArchiveVectorParty // Only used by merge stage during preallocate stage ctx.merged will be nil if ctx.merged != nil { mergedVP = ctx.merged.Columns[columnID].(common.ArchiveVectorParty) } baseIter := ctx.baseIters[sortColIdx] baseIter.setEndPosition(baseEndPos) patchIter := ctx.patchIters[sortColIdx] patchIter.setEndPosition(uint32(patchEndPos)) for !baseIter.done() && !patchIter.done() { // ignore values if the count is 0. if baseIter.count() == 0 { baseIter.next() continue } if patchIter.count() == 0 { patchIter.next() continue } res := patchIter.value().Compare(baseIter.value()) f(baseIter, patchIter, sortColIdx, res, mergedVP) if res < 0 { // New value from patch. ctx.mergeRecursive( sortColIdx+1, 0, int(patchIter.nextPosition()), f, ) patchIter.next() } else if res == 0 { ctx.mergeRecursive( sortColIdx+1, baseIter.nextPosition(), int(patchIter.nextPosition()), f, ) baseIter.next() patchIter.next() } else { ctx.mergeRecursive( sortColIdx+1, baseIter.nextPosition(), 0, f, ) baseIter.next() } } for !patchIter.done() { if patchIter.count() > 0 { // This is equal to compareRes < 0. f(baseIter, patchIter, sortColIdx, -1, mergedVP) ctx.mergeRecursive( sortColIdx+1, 0, int(patchIter.nextPosition()), f, ) } patchIter.next() } for !baseIter.done() { if baseIter.count() > 0 { // This is equal to compareRes > 0. f(baseIter, patchIter, sortColIdx, 1, mergedVP) ctx.mergeRecursive( sortColIdx+1, baseIter.nextPosition(), 0, f, ) } baseIter.next() } }