in memstore/ingestion.go [172:361]
func (shard *TableShard) insertPrimaryKeys(primaryKeyColumns []int, eventTimeColumnIndex int, redoLogFile int64,
upsertBatch *common.UpsertBatch, skipBackfillRows bool) (
map[int32][]recordInfo, map[int32][]recordInfo, *common.UpsertBatch, error) {
// Get primary key column indices and calculate the primary key width.
primaryKeyBytes := shard.Schema.PrimaryKeyBytes
primaryKeyCols, err := upsertBatch.GetPrimaryKeyCols(primaryKeyColumns)
if err != nil {
utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetCounter(utils.PrimaryKeyMissing).Inc(1)
return nil, nil, nil, err
}
shard.Schema.RLock()
recordRetentionDays := shard.Schema.Schema.Config.RecordRetentionInDays
allowMissingEventTime := shard.Schema.Schema.Config.AllowMissingEventTime
isFactTable := shard.Schema.Schema.IsFactTable
shard.Schema.RUnlock()
key := make([]byte, primaryKeyBytes)
updateRecords := make(map[int32][]recordInfo)
insertRecords := make(map[int32][]recordInfo)
nextWriteRecord := shard.LiveStore.NextWriteRecord
tableName := shard.Schema.Schema.Name
shardID := shard.ShardID
var eventTime uint32
var isEventTimeValid bool
var backfillRows = make([]int, 0)
var numRecordsIngested int64
var numRecordsAppended int64
var numRecordsUpdated int64
var numRecordsSkipped int64
var maxUpsertBatchEventTime uint32
for row := 0; row < upsertBatch.NumRows; row++ {
// Get primary key bytes for each record.
// truncate key
key = key[:0]
if key, err = common.AppendPrimaryKeyBytes(key, common.NewPrimaryKeyDataValueIterator(upsertBatch, row, primaryKeyCols)); err != nil {
return nil, nil, nil, utils.StackError(err, "Failed to create primary key at row %d", row)
}
// For fact table we need to get the event time from the first column.
if eventTimeColumnIndex >= 0 {
value, validity, err := upsertBatch.GetValue(row, eventTimeColumnIndex)
if err != nil {
return nil, nil, nil, utils.StackError(err, "Failed to get event time for row %d", row)
}
isEventTimeValid = validity
if isEventTimeValid {
eventTime = *(*uint32)(value)
}
}
var primaryKeyEventTime uint32
if !isEventTimeValid {
if isFactTable && !allowMissingEventTime {
return nil, nil, nil, utils.StackError(err, "Event time for row %d is null", row)
}
// event with invalid event time will be ignored
// once arrival time is older than archiving cutoff.
if primaryKeyEventTime = upsertBatch.ArrivalTime; primaryKeyEventTime < shard.LiveStore.ArchivingCutoffHighWatermark {
continue
}
} else {
var nowInSeconds = uint32(utils.Now().Unix())
var oldestRecordDays int
if recordRetentionDays > 0 {
oldestRecordDays = int(nowInSeconds/86400) - recordRetentionDays
}
primaryKeyEventTime = eventTime
eventDay := int(eventTime / 86400)
// Skip this record if it's out of retention
if eventDay < oldestRecordDays {
utils.GetReporter(tableName, shardID).GetCounter(utils.RecordsOutOfRetention).Inc(1)
continue
}
// Skip this record if its event time is latter than current time
if eventTime > nowInSeconds {
utils.GetReporter(tableName, shardID).GetCounter(utils.RecordsFromFuture).Inc(1)
continue
}
if eventTime > maxUpsertBatchEventTime {
maxUpsertBatchEventTime = eventTime
}
// Update max event time so archiving won't purge redo log files that have records newer than
// archiving cut off time.
shard.LiveStore.RedoLogManager.UpdateMaxEventTime(eventTime, redoLogFile)
// If we get a record that is older than archiving cutoff time (exclusive) that means
// 1. during ingestion, the event should be put into a backfill queue
// 2. during recovery, the event should be ignored, because it was already put into
// a backfill queue at ingestion time.
if eventTime < shard.LiveStore.ArchivingCutoffHighWatermark {
if !skipBackfillRows {
// mark this row as backfill row
backfillRows = append(backfillRows, row)
timeDiff := float64(shard.LiveStore.ArchivingCutoffHighWatermark - eventTime)
utils.GetReporter(tableName, shardID).
GetGauge(utils.BackfillRecordsTimeDifference).Update(timeDiff)
} else {
numRecordsSkipped++
}
continue
}
}
numRecordsIngested++
existing, record, err := shard.LiveStore.PrimaryKey.FindOrInsert(key, nextWriteRecord, primaryKeyEventTime)
if err != nil {
return nil, nil, nil, utils.StackError(err, "Failed to insert key for row %d", row)
}
if !existing {
nextWriteRecord = shard.LiveStore.AdvanceNextWriteRecord()
numRecordsAppended++
} else {
numRecordsUpdated++
}
result := updateRecords
if !existing {
result = insertRecords
}
rows := result[record.BatchID]
result[record.BatchID] = append(
rows,
recordInfo{
row: row,
index: int(record.Index),
})
}
var nowInSeconds = uint32(utils.Now().Unix())
// Update max event time for each column in this upsert batch.
if maxUpsertBatchEventTime > 0 {
for col := 0; col < upsertBatch.NumColumns; col++ {
columnID, err := upsertBatch.GetColumnID(col)
if err != nil {
return nil, nil, nil, utils.StackError(err, "Failed to get column id for col %d", col)
}
for columnID >= len(shard.LiveStore.lastModifiedTimePerColumn) {
shard.LiveStore.lastModifiedTimePerColumn = append(shard.LiveStore.lastModifiedTimePerColumn, 0)
}
if maxUpsertBatchEventTime > shard.LiveStore.lastModifiedTimePerColumn[columnID] {
shard.LiveStore.lastModifiedTimePerColumn[columnID] = maxUpsertBatchEventTime
// We only do it on per upsert batch level so it should be acceptable to create the scope dynamically.
// TODO: if there is any performance issue, cache the reporter at live store level.
utils.GetReporter(tableName, shardID).GetChildGauge(map[string]string{
"columnID": strconv.Itoa(columnID),
}, utils.IngestionLagPerColumn).Update(math.Max(float64(nowInSeconds-maxUpsertBatchEventTime), 0))
}
}
}
utils.GetReporter(tableName, shardID).GetCounter(utils.IngestedRecords).Inc(numRecordsIngested)
utils.GetReporter(tableName, shardID).GetCounter(utils.AppendedRecords).Inc(numRecordsAppended)
utils.GetReporter(tableName, shardID).GetCounter(utils.UpdatedRecords).Inc(numRecordsUpdated)
utils.GetReporter(tableName, shardID).GetCounter(utils.BackfillRecords).Inc(int64(len(backfillRows)))
utils.GetReporter(tableName, shardID).GetCounter(utils.IngestSkippedRecords).Inc(numRecordsSkipped)
// update ratio gauge of backfill rows/total rows
if upsertBatch.NumRows > 0 {
utils.GetReporter(tableName, shardID).GetGauge(utils.BackfillRecordsRatio).
Update(float64(100.0 * len(backfillRows) / upsertBatch.NumRows))
}
// create backfill upsertBatch if applicable
if len(backfillRows) == upsertBatch.NumRows {
// all rows are for backfill
return updateRecords, insertRecords, upsertBatch, nil
}
backfillBatch := upsertBatch.ExtractBackfillBatch(backfillRows)
if backfillBatch != nil && backfillBatch.NumRows > 0 && backfillBatch.NumColumns < upsertBatch.NumColumns {
// some columns get pruned due to inappropriate update functions
utils.GetReporter(tableName, shardID).GetCounter(utils.BackfillRecordsColumnRemoved).Inc(int64(len(backfillRows)))
}
return updateRecords, insertRecords, backfillBatch, nil
}