in memstore/ingestion.go [364:494]
func (shard *TableShard) writeBatchRecords(columnDeletions []bool,
upsertBatch *common.UpsertBatch, batchID int32, records []recordInfo, forUpdate bool) error {
var batch *LiveBatch
if forUpdate {
// We need to lock the batch for update to achieve row level consistency.
start := utils.Now()
batch = shard.LiveStore.GetBatchForWrite(batchID)
utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetTimer(utils.IngestionWritelockAquireTime).Record(utils.Now().Sub(start))
defer batch.Unlock()
} else {
// Make sure all columns are created.
start := utils.Now()
batch = shard.LiveStore.GetBatchForWrite(batchID)
utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetTimer(utils.IngestionWritelockAquireTime).Record(utils.Now().Sub(start))
for i := 0; i < upsertBatch.NumColumns; i++ {
columnID, _ := upsertBatch.GetColumnID(i)
if columnDeletions[columnID] {
continue
}
batch.GetOrCreateVectorParty(columnID, true)
}
batch.Unlock()
batch.RLock()
defer batch.RUnlock()
}
if batch.MaxArrivalTime < upsertBatch.ArrivalTime {
batch.MaxArrivalTime = upsertBatch.ArrivalTime
}
// Instead of traversing row by row, we instead do column by column to avoid making checks on each row.
for col := 0; col < upsertBatch.NumColumns; col++ {
columnID, err := upsertBatch.GetColumnID(col)
if err != nil {
return utils.StackError(err, "Failed to get column id for col %d", col)
}
if columnDeletions[columnID] {
continue
}
columnUpdateMode := upsertBatch.GetColumnUpdateMode(col)
columnMode := upsertBatch.GetColumMode(col)
// we will skip processing this column if
// 1. columnMode is AllValuesDefault
// 2. columnUpdateMode is UpdateOverwriteNotNull
if columnMode == common.AllValuesDefault && columnUpdateMode == common.UpdateOverwriteNotNull {
continue
}
if col >= upsertBatch.GetColumnLen() {
return utils.StackError(nil, "Column index %d out of range %d", col, upsertBatch.GetColumnLen())
}
vectorParty := batch.GetOrCreateVectorParty(columnID, true)
dataType, _ := upsertBatch.GetColumnType(col)
cmpFunc := common.GetCompareFunc(dataType)
// check whether the update mode is valid based on data type.
forceWrite := false
if forUpdate {
switch columnUpdateMode {
case common.UpdateForceOverwrite:
// always update
forceWrite = true
case common.UpdateWithAddition:
fallthrough
case common.UpdateWithMin:
fallthrough
case common.UpdateWithMax:
if !common.IsNumeric(dataType) {
return utils.StackError(nil, "Unsupported data type %x for column update mode %x", dataType, columnUpdateMode)
}
}
}
for _, recordInfo := range records {
if recordInfo.row >= upsertBatch.NumRows {
return utils.StackError(nil, "Row index %d out of range %d", recordInfo.row, upsertBatch.NumRows)
}
// We explicitly treat different columns by checking whether they are
// 1. Bool type
// 2. Go types
// 3. Other types
// Via doing this, we save lots of stack space to storing all related fields for different cases.
if dataType == common.Bool {
val, valid, _ := upsertBatch.GetBool(recordInfo.row, col)
if !valid && !forceWrite {
continue
}
vectorParty.SetBool(recordInfo.index, val, valid)
} else if common.IsGoType(dataType) {
val := upsertBatch.ReadGoValue(recordInfo.row, col)
valid := val != nil
if !valid && !forceWrite {
continue
}
vectorParty.SetGoValue(recordInfo.index, val, valid)
} else {
val, valid, _ := upsertBatch.GetValue(recordInfo.row, col)
if !valid && !forceWrite {
continue
}
// only read oldValue when mode is one of add, min, max.
if columnUpdateMode >= common.UpdateWithAddition && columnUpdateMode <= common.UpdateWithMax {
oldVal, oldValid := vectorParty.GetValue(recordInfo.index)
// Only need to do calculation when old value is valid, otherwise we can directly
// set what's in upsert batch.
if oldValid {
switch columnUpdateMode {
case common.UpdateWithAddition:
common.AdditionUpdate(oldVal, val, dataType)
case common.UpdateWithMin:
common.MinMaxUpdate(oldVal, val, dataType, cmpFunc, 1)
case common.UpdateWithMax:
common.MinMaxUpdate(oldVal, val, dataType, cmpFunc, -1)
}
continue
}
}
// if the value is not updated, set the value directly using value from upsert batch.
vectorParty.SetValue(recordInfo.index, val, valid)
}
}
}
return nil
}