func()

in memstore/ingestion.go [172:361]


func (shard *TableShard) insertPrimaryKeys(primaryKeyColumns []int, eventTimeColumnIndex int, redoLogFile int64,
	upsertBatch *common.UpsertBatch, skipBackfillRows bool) (
	map[int32][]recordInfo, map[int32][]recordInfo, *common.UpsertBatch, error) {
	// Get primary key column indices and calculate the primary key width.
	primaryKeyBytes := shard.Schema.PrimaryKeyBytes
	primaryKeyCols, err := upsertBatch.GetPrimaryKeyCols(primaryKeyColumns)
	if err != nil {
		utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetCounter(utils.PrimaryKeyMissing).Inc(1)
		return nil, nil, nil, err
	}

	shard.Schema.RLock()
	recordRetentionDays := shard.Schema.Schema.Config.RecordRetentionInDays
	allowMissingEventTime := shard.Schema.Schema.Config.AllowMissingEventTime
	isFactTable := shard.Schema.Schema.IsFactTable
	shard.Schema.RUnlock()

	key := make([]byte, primaryKeyBytes)
	updateRecords := make(map[int32][]recordInfo)
	insertRecords := make(map[int32][]recordInfo)

	nextWriteRecord := shard.LiveStore.NextWriteRecord

	tableName := shard.Schema.Schema.Name
	shardID := shard.ShardID
	var eventTime uint32
	var isEventTimeValid bool
	var backfillRows = make([]int, 0)
	var numRecordsIngested int64
	var numRecordsAppended int64
	var numRecordsUpdated int64
	var numRecordsSkipped int64
	var maxUpsertBatchEventTime uint32
	for row := 0; row < upsertBatch.NumRows; row++ {
		// Get primary key bytes for each record.
		// truncate key
		key = key[:0]
		if key, err = common.AppendPrimaryKeyBytes(key, common.NewPrimaryKeyDataValueIterator(upsertBatch, row, primaryKeyCols)); err != nil {
			return nil, nil, nil, utils.StackError(err, "Failed to create primary key at row %d", row)
		}

		// For fact table we need to get the event time from the first column.
		if eventTimeColumnIndex >= 0 {
			value, validity, err := upsertBatch.GetValue(row, eventTimeColumnIndex)
			if err != nil {
				return nil, nil, nil, utils.StackError(err, "Failed to get event time for row %d", row)
			}

			isEventTimeValid = validity
			if isEventTimeValid {
				eventTime = *(*uint32)(value)
			}
		}

		var primaryKeyEventTime uint32
		if !isEventTimeValid {
			if isFactTable && !allowMissingEventTime {
				return nil, nil, nil, utils.StackError(err, "Event time for row %d is null", row)
			}
			// event with invalid event time will be ignored
			// once arrival time is older than archiving cutoff.
			if primaryKeyEventTime = upsertBatch.ArrivalTime; primaryKeyEventTime < shard.LiveStore.ArchivingCutoffHighWatermark {
				continue
			}
		} else {
			var nowInSeconds = uint32(utils.Now().Unix())
			var oldestRecordDays int
			if recordRetentionDays > 0 {
				oldestRecordDays = int(nowInSeconds/86400) - recordRetentionDays
			}

			primaryKeyEventTime = eventTime

			eventDay := int(eventTime / 86400)

			// Skip this record if it's out of retention
			if eventDay < oldestRecordDays {
				utils.GetReporter(tableName, shardID).GetCounter(utils.RecordsOutOfRetention).Inc(1)
				continue
			}

			// Skip this record if its event time is latter than current time
			if eventTime > nowInSeconds {
				utils.GetReporter(tableName, shardID).GetCounter(utils.RecordsFromFuture).Inc(1)
				continue
			}

			if eventTime > maxUpsertBatchEventTime {
				maxUpsertBatchEventTime = eventTime
			}

			// Update max event time so archiving won't purge redo log files that have records newer than
			// archiving cut off time.
			shard.LiveStore.RedoLogManager.UpdateMaxEventTime(eventTime, redoLogFile)

			// If we get a record that is older than archiving cutoff time (exclusive) that means
			// 1. during ingestion, the event should be put into a backfill queue
			// 2. during recovery, the event should be ignored, because it was already put into
			//    a backfill queue at ingestion time.
			if eventTime < shard.LiveStore.ArchivingCutoffHighWatermark {
				if !skipBackfillRows {
					// mark this row as backfill row
					backfillRows = append(backfillRows, row)
					timeDiff := float64(shard.LiveStore.ArchivingCutoffHighWatermark - eventTime)
					utils.GetReporter(tableName, shardID).
						GetGauge(utils.BackfillRecordsTimeDifference).Update(timeDiff)
				} else {
					numRecordsSkipped++
				}
				continue
			}
		}

		numRecordsIngested++
		existing, record, err := shard.LiveStore.PrimaryKey.FindOrInsert(key, nextWriteRecord, primaryKeyEventTime)
		if err != nil {
			return nil, nil, nil, utils.StackError(err, "Failed to insert key for row %d", row)
		}

		if !existing {
			nextWriteRecord = shard.LiveStore.AdvanceNextWriteRecord()
			numRecordsAppended++
		} else {
			numRecordsUpdated++
		}

		result := updateRecords
		if !existing {
			result = insertRecords
		}

		rows := result[record.BatchID]
		result[record.BatchID] = append(
			rows,
			recordInfo{
				row:   row,
				index: int(record.Index),
			})
	}

	var nowInSeconds = uint32(utils.Now().Unix())
	// Update max event time for each column in this upsert batch.
	if maxUpsertBatchEventTime > 0 {
		for col := 0; col < upsertBatch.NumColumns; col++ {
			columnID, err := upsertBatch.GetColumnID(col)
			if err != nil {
				return nil, nil, nil, utils.StackError(err, "Failed to get column id for col %d", col)
			}

			for columnID >= len(shard.LiveStore.lastModifiedTimePerColumn) {
				shard.LiveStore.lastModifiedTimePerColumn = append(shard.LiveStore.lastModifiedTimePerColumn, 0)
			}

			if maxUpsertBatchEventTime > shard.LiveStore.lastModifiedTimePerColumn[columnID] {
				shard.LiveStore.lastModifiedTimePerColumn[columnID] = maxUpsertBatchEventTime
				// We only do it on per upsert batch level so it should be acceptable to create the scope dynamically.
				// TODO: if there is any performance issue, cache the reporter at live store level.
				utils.GetReporter(tableName, shardID).GetChildGauge(map[string]string{
					"columnID": strconv.Itoa(columnID),
				}, utils.IngestionLagPerColumn).Update(math.Max(float64(nowInSeconds-maxUpsertBatchEventTime), 0))
			}
		}
	}

	utils.GetReporter(tableName, shardID).GetCounter(utils.IngestedRecords).Inc(numRecordsIngested)
	utils.GetReporter(tableName, shardID).GetCounter(utils.AppendedRecords).Inc(numRecordsAppended)
	utils.GetReporter(tableName, shardID).GetCounter(utils.UpdatedRecords).Inc(numRecordsUpdated)
	utils.GetReporter(tableName, shardID).GetCounter(utils.BackfillRecords).Inc(int64(len(backfillRows)))
	utils.GetReporter(tableName, shardID).GetCounter(utils.IngestSkippedRecords).Inc(numRecordsSkipped)

	// update ratio gauge of backfill rows/total rows
	if upsertBatch.NumRows > 0 {
		utils.GetReporter(tableName, shardID).GetGauge(utils.BackfillRecordsRatio).
			Update(float64(100.0 * len(backfillRows) / upsertBatch.NumRows))
	}

	// create backfill upsertBatch if applicable
	if len(backfillRows) == upsertBatch.NumRows {
		// all rows are for backfill
		return updateRecords, insertRecords, upsertBatch, nil
	}

	backfillBatch := upsertBatch.ExtractBackfillBatch(backfillRows)
	if backfillBatch != nil && backfillBatch.NumRows > 0 && backfillBatch.NumColumns < upsertBatch.NumColumns {
		// some columns get pruned due to inappropriate update functions
		utils.GetReporter(tableName, shardID).GetCounter(utils.BackfillRecordsColumnRemoved).Inc(int64(len(backfillRows)))

	}
	return updateRecords, insertRecords, backfillBatch, nil
}