memstore/ingestion.go (352 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memstore
import (
"github.com/uber/aresdb/memstore/common"
"github.com/uber/aresdb/utils"
"math"
"strconv"
)
// HandleIngestion logs an upsert batch and applies it to the in-memory store.
func (m *memStoreImpl) HandleIngestion(table string, shardID int, upsertBatch *common.UpsertBatch) error {
shard, err := m.GetTableShard(table, shardID)
if err != nil {
return utils.StackError(nil, "Failed to get shard %d for table %s for upsert batch", shardID, table)
}
// Release the wait group that proctects the shard to be deleted.
defer shard.Users.Done()
if !shard.LiveStore.RedoLogManager.IsAppendEnabled() {
return utils.StackError(nil, "appending not enabled on redolog manager for table %s", table)
}
return shard.saveUpsertBatch(upsertBatch, 0, 0, false, false)
}
// saveUpsertBatch handles data ingestion from both redolog and http
func (shard *TableShard) saveUpsertBatch(upsertBatch *common.UpsertBatch, redoLogFile int64, offset uint32, recovery, skipBackFillRows bool) error {
tableName := shard.Schema.Schema.Name
shardID := shard.ShardID
shard.LiveStore.WriterLock.Lock()
if recovery {
utils.GetReporter(tableName, shardID).GetCounter(utils.IngestedRecoveryBatches).Inc(1)
utils.GetReporter(tableName, shardID).GetGauge(utils.RecoveryUpsertBatchSize).Update(float64(len(upsertBatch.GetBuffer())))
// Put a 0 in maxEventTimePerFile in case this is redolog is full of backfill batches.
shard.LiveStore.RedoLogManager.UpdateMaxEventTime(0, redoLogFile)
} else {
utils.GetReporter(tableName, shardID).GetCounter(utils.IngestedUpsertBatches).Inc(1)
utils.GetReporter(tableName, shardID).GetGauge(utils.UpsertBatchSize).Update(float64(len(upsertBatch.GetBuffer())))
// for non-recovery and local file based redolog, need write the upsertbatch into redolog file
if shard.LiveStore.RedoLogManager.IsAppendEnabled() {
// change original file/offset to be local redolog file/offset
redoLogFile, offset = shard.LiveStore.RedoLogManager.AppendToRedoLog(upsertBatch)
}
}
needToWaitForBackfillBuffer, err := shard.ApplyUpsertBatch(upsertBatch, redoLogFile, offset, skipBackFillRows)
shard.LiveStore.WriterLock.Unlock()
// return immediately if it does not need to wait for backfill buffer availability
if recovery || !needToWaitForBackfillBuffer {
return err
}
// otherwise: block until backfill buffer becomes available again
shard.LiveStore.BackfillManager.WaitForBackfillBufferAvailability()
return err
}
// ApplyUpsertBatch applies the upsert batch to the memstore shard.
// Returns true if caller needs to wait for availability of backfill buffer
func (shard *TableShard) ApplyUpsertBatch(upsertBatch *common.UpsertBatch, redoLogFile int64, offset uint32, skipBackfillRows bool) (bool, error) {
shard.Schema.RLock()
valueTypeByColumn := shard.Schema.ValueTypeByColumn
columnDeletions := shard.Schema.GetColumnDeletions()
allowMissingEventTime := shard.Schema.Schema.Config.AllowMissingEventTime
shard.Schema.RUnlock()
primaryKeyColumns := shard.Schema.GetPrimaryKeyColumns()
// IsFactTable should be immutable.
isFactTable := shard.Schema.Schema.IsFactTable
// This is the upsertbatch column index that points to the first logic column (which is
// event time for fact table).
eventTimeColumnIndex := -1
// Validate columns in upsert batch are valid.
for i := 0; i < upsertBatch.NumColumns; i++ {
columnID, _ := upsertBatch.GetColumnID(i)
if columnID >= len(valueTypeByColumn) {
return false, utils.StackError(nil, "Unrecognized column id %d in upsert batch", columnID)
}
columnType, _ := upsertBatch.GetColumnType(i)
if valueTypeByColumn[columnID] != columnType {
return false, utils.StackError(
nil,
"Mismatched data type (upsert batch: %d, schema %d) for table %s shard %d column %d", columnType, valueTypeByColumn[columnID], shard.Schema.Schema.Name, shard.ShardID, columnID)
}
if columnID == 0 && isFactTable {
eventTimeColumnIndex = i
}
}
// For fact table ingestion, we will need to get the event time from the first column. we don't
// have to validate the column type in the upsertbatch because the loop above already handled it.
if isFactTable && eventTimeColumnIndex < 0 && !allowMissingEventTime {
utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetCounter(utils.TimeColumnMissing).Inc(1)
return false, utils.StackError(nil, "Fact table's event time column (first column) is missing")
}
start := utils.Now()
updateRecords, insertRecords, backfillUpsertBatch, err := shard.insertPrimaryKeys(primaryKeyColumns, eventTimeColumnIndex,
redoLogFile, upsertBatch, skipBackfillRows)
utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetTimer(utils.IngestionPrimaryKeyLookupTime).Record(utils.Now().Sub(start))
if err != nil {
return false, err
}
// We write insert records first so records with the same primary key in a upsert batch
// will be updated in order.
for batchID, records := range insertRecords {
if err := shard.writeBatchRecords(columnDeletions, upsertBatch, batchID, records, false); err != nil {
return false, err
}
}
for batchID, records := range updateRecords {
if err := shard.writeBatchRecords(columnDeletions, upsertBatch, batchID, records, true); err != nil {
return false, err
}
}
shard.LiveStore.AdvanceLastReadRecord()
numMutations := len(insertRecords) + len(updateRecords)
return shard.postUpsertBatchApplication(upsertBatch, backfillUpsertBatch, redoLogFile, offset, numMutations), nil
}
func (shard *TableShard) postUpsertBatchApplication(upsertBatch, backfillUpsertBatch *common.UpsertBatch, redoLogFile int64,
offset uint32, numMutations int) bool {
if shard.Schema.Schema.IsFactTable {
// add records to backfill queue if any.
// TODO: currently we're relying on LiveStore.WriterLock to guarantee the ordering of backfill batches
backfillMgr := shard.LiveStore.BackfillManager
needWaitForBackfillBuffer := backfillMgr.Append(backfillUpsertBatch, redoLogFile, offset)
// caller needs to wait WHEN backfill buffer is full and there're more than 10 rows or over 5% rows are for backfill
if needWaitForBackfillBuffer &&
(backfillUpsertBatch.NumRows > 10 || float32(backfillUpsertBatch.NumRows)/float32(upsertBatch.NumRows) > 0.05) {
return true
}
} else {
shard.LiveStore.SnapshotManager.ApplyUpsertBatch(redoLogFile, offset, numMutations, shard.LiveStore.LastReadRecord)
}
return false
}
// Per record instruction on how to read from upsert batch and write to memStore.
type recordInfo struct {
// The row index of the record in the upsert batch.
row int
// The index of the to be inserted/updated record in a batch.
index int
}
// Insert primary keys and return the records for update, insert grouped by batch.
// eventTimeColumnIndex will be used to extract the event time value per row if it >= 0.
func (shard *TableShard) insertPrimaryKeys(primaryKeyColumns []int, eventTimeColumnIndex int, redoLogFile int64,
upsertBatch *common.UpsertBatch, skipBackfillRows bool) (
map[int32][]recordInfo, map[int32][]recordInfo, *common.UpsertBatch, error) {
// Get primary key column indices and calculate the primary key width.
primaryKeyBytes := shard.Schema.PrimaryKeyBytes
primaryKeyCols, err := upsertBatch.GetPrimaryKeyCols(primaryKeyColumns)
if err != nil {
utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetCounter(utils.PrimaryKeyMissing).Inc(1)
return nil, nil, nil, err
}
shard.Schema.RLock()
recordRetentionDays := shard.Schema.Schema.Config.RecordRetentionInDays
allowMissingEventTime := shard.Schema.Schema.Config.AllowMissingEventTime
isFactTable := shard.Schema.Schema.IsFactTable
shard.Schema.RUnlock()
key := make([]byte, primaryKeyBytes)
updateRecords := make(map[int32][]recordInfo)
insertRecords := make(map[int32][]recordInfo)
nextWriteRecord := shard.LiveStore.NextWriteRecord
tableName := shard.Schema.Schema.Name
shardID := shard.ShardID
var eventTime uint32
var isEventTimeValid bool
var backfillRows = make([]int, 0)
var numRecordsIngested int64
var numRecordsAppended int64
var numRecordsUpdated int64
var numRecordsSkipped int64
var maxUpsertBatchEventTime uint32
for row := 0; row < upsertBatch.NumRows; row++ {
// Get primary key bytes for each record.
// truncate key
key = key[:0]
if key, err = common.AppendPrimaryKeyBytes(key, common.NewPrimaryKeyDataValueIterator(upsertBatch, row, primaryKeyCols)); err != nil {
return nil, nil, nil, utils.StackError(err, "Failed to create primary key at row %d", row)
}
// For fact table we need to get the event time from the first column.
if eventTimeColumnIndex >= 0 {
value, validity, err := upsertBatch.GetValue(row, eventTimeColumnIndex)
if err != nil {
return nil, nil, nil, utils.StackError(err, "Failed to get event time for row %d", row)
}
isEventTimeValid = validity
if isEventTimeValid {
eventTime = *(*uint32)(value)
}
}
var primaryKeyEventTime uint32
if !isEventTimeValid {
if isFactTable && !allowMissingEventTime {
return nil, nil, nil, utils.StackError(err, "Event time for row %d is null", row)
}
// event with invalid event time will be ignored
// once arrival time is older than archiving cutoff.
if primaryKeyEventTime = upsertBatch.ArrivalTime; primaryKeyEventTime < shard.LiveStore.ArchivingCutoffHighWatermark {
continue
}
} else {
var nowInSeconds = uint32(utils.Now().Unix())
var oldestRecordDays int
if recordRetentionDays > 0 {
oldestRecordDays = int(nowInSeconds/86400) - recordRetentionDays
}
primaryKeyEventTime = eventTime
eventDay := int(eventTime / 86400)
// Skip this record if it's out of retention
if eventDay < oldestRecordDays {
utils.GetReporter(tableName, shardID).GetCounter(utils.RecordsOutOfRetention).Inc(1)
continue
}
// Skip this record if its event time is latter than current time
if eventTime > nowInSeconds {
utils.GetReporter(tableName, shardID).GetCounter(utils.RecordsFromFuture).Inc(1)
continue
}
if eventTime > maxUpsertBatchEventTime {
maxUpsertBatchEventTime = eventTime
}
// Update max event time so archiving won't purge redo log files that have records newer than
// archiving cut off time.
shard.LiveStore.RedoLogManager.UpdateMaxEventTime(eventTime, redoLogFile)
// If we get a record that is older than archiving cutoff time (exclusive) that means
// 1. during ingestion, the event should be put into a backfill queue
// 2. during recovery, the event should be ignored, because it was already put into
// a backfill queue at ingestion time.
if eventTime < shard.LiveStore.ArchivingCutoffHighWatermark {
if !skipBackfillRows {
// mark this row as backfill row
backfillRows = append(backfillRows, row)
timeDiff := float64(shard.LiveStore.ArchivingCutoffHighWatermark - eventTime)
utils.GetReporter(tableName, shardID).
GetGauge(utils.BackfillRecordsTimeDifference).Update(timeDiff)
} else {
numRecordsSkipped++
}
continue
}
}
numRecordsIngested++
existing, record, err := shard.LiveStore.PrimaryKey.FindOrInsert(key, nextWriteRecord, primaryKeyEventTime)
if err != nil {
return nil, nil, nil, utils.StackError(err, "Failed to insert key for row %d", row)
}
if !existing {
nextWriteRecord = shard.LiveStore.AdvanceNextWriteRecord()
numRecordsAppended++
} else {
numRecordsUpdated++
}
result := updateRecords
if !existing {
result = insertRecords
}
rows := result[record.BatchID]
result[record.BatchID] = append(
rows,
recordInfo{
row: row,
index: int(record.Index),
})
}
var nowInSeconds = uint32(utils.Now().Unix())
// Update max event time for each column in this upsert batch.
if maxUpsertBatchEventTime > 0 {
for col := 0; col < upsertBatch.NumColumns; col++ {
columnID, err := upsertBatch.GetColumnID(col)
if err != nil {
return nil, nil, nil, utils.StackError(err, "Failed to get column id for col %d", col)
}
for columnID >= len(shard.LiveStore.lastModifiedTimePerColumn) {
shard.LiveStore.lastModifiedTimePerColumn = append(shard.LiveStore.lastModifiedTimePerColumn, 0)
}
if maxUpsertBatchEventTime > shard.LiveStore.lastModifiedTimePerColumn[columnID] {
shard.LiveStore.lastModifiedTimePerColumn[columnID] = maxUpsertBatchEventTime
// We only do it on per upsert batch level so it should be acceptable to create the scope dynamically.
// TODO: if there is any performance issue, cache the reporter at live store level.
utils.GetReporter(tableName, shardID).GetChildGauge(map[string]string{
"columnID": strconv.Itoa(columnID),
}, utils.IngestionLagPerColumn).Update(math.Max(float64(nowInSeconds-maxUpsertBatchEventTime), 0))
}
}
}
utils.GetReporter(tableName, shardID).GetCounter(utils.IngestedRecords).Inc(numRecordsIngested)
utils.GetReporter(tableName, shardID).GetCounter(utils.AppendedRecords).Inc(numRecordsAppended)
utils.GetReporter(tableName, shardID).GetCounter(utils.UpdatedRecords).Inc(numRecordsUpdated)
utils.GetReporter(tableName, shardID).GetCounter(utils.BackfillRecords).Inc(int64(len(backfillRows)))
utils.GetReporter(tableName, shardID).GetCounter(utils.IngestSkippedRecords).Inc(numRecordsSkipped)
// update ratio gauge of backfill rows/total rows
if upsertBatch.NumRows > 0 {
utils.GetReporter(tableName, shardID).GetGauge(utils.BackfillRecordsRatio).
Update(float64(100.0 * len(backfillRows) / upsertBatch.NumRows))
}
// create backfill upsertBatch if applicable
if len(backfillRows) == upsertBatch.NumRows {
// all rows are for backfill
return updateRecords, insertRecords, upsertBatch, nil
}
backfillBatch := upsertBatch.ExtractBackfillBatch(backfillRows)
if backfillBatch != nil && backfillBatch.NumRows > 0 && backfillBatch.NumColumns < upsertBatch.NumColumns {
// some columns get pruned due to inappropriate update functions
utils.GetReporter(tableName, shardID).GetCounter(utils.BackfillRecordsColumnRemoved).Inc(int64(len(backfillRows)))
}
return updateRecords, insertRecords, backfillBatch, nil
}
// Read rows from a batch group and write to memStore. Batch id = 0 is for records to be inserted.
func (shard *TableShard) writeBatchRecords(columnDeletions []bool,
upsertBatch *common.UpsertBatch, batchID int32, records []recordInfo, forUpdate bool) error {
var batch *LiveBatch
if forUpdate {
// We need to lock the batch for update to achieve row level consistency.
start := utils.Now()
batch = shard.LiveStore.GetBatchForWrite(batchID)
utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetTimer(utils.IngestionWritelockAquireTime).Record(utils.Now().Sub(start))
defer batch.Unlock()
} else {
// Make sure all columns are created.
start := utils.Now()
batch = shard.LiveStore.GetBatchForWrite(batchID)
utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetTimer(utils.IngestionWritelockAquireTime).Record(utils.Now().Sub(start))
for i := 0; i < upsertBatch.NumColumns; i++ {
columnID, _ := upsertBatch.GetColumnID(i)
if columnDeletions[columnID] {
continue
}
batch.GetOrCreateVectorParty(columnID, true)
}
batch.Unlock()
batch.RLock()
defer batch.RUnlock()
}
if batch.MaxArrivalTime < upsertBatch.ArrivalTime {
batch.MaxArrivalTime = upsertBatch.ArrivalTime
}
// Instead of traversing row by row, we instead do column by column to avoid making checks on each row.
for col := 0; col < upsertBatch.NumColumns; col++ {
columnID, err := upsertBatch.GetColumnID(col)
if err != nil {
return utils.StackError(err, "Failed to get column id for col %d", col)
}
if columnDeletions[columnID] {
continue
}
columnUpdateMode := upsertBatch.GetColumnUpdateMode(col)
columnMode := upsertBatch.GetColumMode(col)
// we will skip processing this column if
// 1. columnMode is AllValuesDefault
// 2. columnUpdateMode is UpdateOverwriteNotNull
if columnMode == common.AllValuesDefault && columnUpdateMode == common.UpdateOverwriteNotNull {
continue
}
if col >= upsertBatch.GetColumnLen() {
return utils.StackError(nil, "Column index %d out of range %d", col, upsertBatch.GetColumnLen())
}
vectorParty := batch.GetOrCreateVectorParty(columnID, true)
dataType, _ := upsertBatch.GetColumnType(col)
cmpFunc := common.GetCompareFunc(dataType)
// check whether the update mode is valid based on data type.
forceWrite := false
if forUpdate {
switch columnUpdateMode {
case common.UpdateForceOverwrite:
// always update
forceWrite = true
case common.UpdateWithAddition:
fallthrough
case common.UpdateWithMin:
fallthrough
case common.UpdateWithMax:
if !common.IsNumeric(dataType) {
return utils.StackError(nil, "Unsupported data type %x for column update mode %x", dataType, columnUpdateMode)
}
}
}
for _, recordInfo := range records {
if recordInfo.row >= upsertBatch.NumRows {
return utils.StackError(nil, "Row index %d out of range %d", recordInfo.row, upsertBatch.NumRows)
}
// We explicitly treat different columns by checking whether they are
// 1. Bool type
// 2. Go types
// 3. Other types
// Via doing this, we save lots of stack space to storing all related fields for different cases.
if dataType == common.Bool {
val, valid, _ := upsertBatch.GetBool(recordInfo.row, col)
if !valid && !forceWrite {
continue
}
vectorParty.SetBool(recordInfo.index, val, valid)
} else if common.IsGoType(dataType) {
val := upsertBatch.ReadGoValue(recordInfo.row, col)
valid := val != nil
if !valid && !forceWrite {
continue
}
vectorParty.SetGoValue(recordInfo.index, val, valid)
} else {
val, valid, _ := upsertBatch.GetValue(recordInfo.row, col)
if !valid && !forceWrite {
continue
}
// only read oldValue when mode is one of add, min, max.
if columnUpdateMode >= common.UpdateWithAddition && columnUpdateMode <= common.UpdateWithMax {
oldVal, oldValid := vectorParty.GetValue(recordInfo.index)
// Only need to do calculation when old value is valid, otherwise we can directly
// set what's in upsert batch.
if oldValid {
switch columnUpdateMode {
case common.UpdateWithAddition:
common.AdditionUpdate(oldVal, val, dataType)
case common.UpdateWithMin:
common.MinMaxUpdate(oldVal, val, dataType, cmpFunc, 1)
case common.UpdateWithMax:
common.MinMaxUpdate(oldVal, val, dataType, cmpFunc, -1)
}
continue
}
}
// if the value is not updated, set the value directly using value from upsert batch.
vectorParty.SetValue(recordInfo.index, val, valid)
}
}
}
return nil
}