func()

in memstore/ingestion.go [364:494]


func (shard *TableShard) writeBatchRecords(columnDeletions []bool,
	upsertBatch *common.UpsertBatch, batchID int32, records []recordInfo, forUpdate bool) error {
	var batch *LiveBatch
	if forUpdate {
		// We need to lock the batch for update to achieve row level consistency.
		start := utils.Now()
		batch = shard.LiveStore.GetBatchForWrite(batchID)
		utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetTimer(utils.IngestionWritelockAquireTime).Record(utils.Now().Sub(start))
		defer batch.Unlock()
	} else {
		// Make sure all columns are created.
		start := utils.Now()
		batch = shard.LiveStore.GetBatchForWrite(batchID)
		utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetTimer(utils.IngestionWritelockAquireTime).Record(utils.Now().Sub(start))
		for i := 0; i < upsertBatch.NumColumns; i++ {
			columnID, _ := upsertBatch.GetColumnID(i)
			if columnDeletions[columnID] {
				continue
			}
			batch.GetOrCreateVectorParty(columnID, true)
		}
		batch.Unlock()

		batch.RLock()
		defer batch.RUnlock()
	}

	if batch.MaxArrivalTime < upsertBatch.ArrivalTime {
		batch.MaxArrivalTime = upsertBatch.ArrivalTime
	}

	// Instead of traversing row by row, we instead do column by column to avoid making checks on each row.
	for col := 0; col < upsertBatch.NumColumns; col++ {
		columnID, err := upsertBatch.GetColumnID(col)
		if err != nil {
			return utils.StackError(err, "Failed to get column id for col %d", col)
		}
		if columnDeletions[columnID] {
			continue
		}

		columnUpdateMode := upsertBatch.GetColumnUpdateMode(col)
		columnMode := upsertBatch.GetColumMode(col)

		// we will skip processing this column if
		// 1. columnMode is AllValuesDefault
		// 2. columnUpdateMode is UpdateOverwriteNotNull
		if columnMode == common.AllValuesDefault && columnUpdateMode == common.UpdateOverwriteNotNull {
			continue
		}

		if col >= upsertBatch.GetColumnLen() {
			return utils.StackError(nil, "Column index %d out of range %d", col, upsertBatch.GetColumnLen())
		}

		vectorParty := batch.GetOrCreateVectorParty(columnID, true)
		dataType, _ := upsertBatch.GetColumnType(col)
		cmpFunc := common.GetCompareFunc(dataType)

		// check whether the update mode is valid based on data type.
		forceWrite := false
		if forUpdate {
			switch columnUpdateMode {
			case common.UpdateForceOverwrite:
				// always update
				forceWrite = true
			case common.UpdateWithAddition:
				fallthrough
			case common.UpdateWithMin:
				fallthrough
			case common.UpdateWithMax:
				if !common.IsNumeric(dataType) {
					return utils.StackError(nil, "Unsupported data type %x for column update mode %x", dataType, columnUpdateMode)
				}
			}
		}

		for _, recordInfo := range records {
			if recordInfo.row >= upsertBatch.NumRows {
				return utils.StackError(nil, "Row index %d out of range %d", recordInfo.row, upsertBatch.NumRows)
			}

			// We explicitly treat different columns by checking whether they are
			// 1. Bool type
			// 2. Go types
			// 3. Other types
			// Via doing this, we save lots of stack space to storing all related fields for different cases.
			if dataType == common.Bool {
				val, valid, _ := upsertBatch.GetBool(recordInfo.row, col)
				if !valid && !forceWrite {
					continue
				}
				vectorParty.SetBool(recordInfo.index, val, valid)
			} else if common.IsGoType(dataType) {
				val := upsertBatch.ReadGoValue(recordInfo.row, col)
				valid := val != nil
				if !valid && !forceWrite {
					continue
				}
				vectorParty.SetGoValue(recordInfo.index, val, valid)
			} else {
				val, valid, _ := upsertBatch.GetValue(recordInfo.row, col)
				if !valid && !forceWrite {
					continue
				}

				// only read oldValue when mode is one of add, min, max.
				if columnUpdateMode >= common.UpdateWithAddition && columnUpdateMode <= common.UpdateWithMax {
					oldVal, oldValid := vectorParty.GetValue(recordInfo.index)
					// Only need to do calculation when old value is valid, otherwise we can directly
					// set what's in upsert batch.
					if oldValid {
						switch columnUpdateMode {
						case common.UpdateWithAddition:
							common.AdditionUpdate(oldVal, val, dataType)
						case common.UpdateWithMin:
							common.MinMaxUpdate(oldVal, val, dataType, cmpFunc, 1)
						case common.UpdateWithMax:
							common.MinMaxUpdate(oldVal, val, dataType, cmpFunc, -1)
						}
						continue
					}
				}

				// if the value is not updated, set the value directly using value from upsert batch.
				vectorParty.SetValue(recordInfo.index, val, valid)
			}
		}
	}
	return nil
}