in bus/event_bridge.go [85:114]
func batchEvents(events []types.Event, maxBatchSize uint, batchFn func([]types.Event) ([]types.FailedEvent, error)) ([]types.FailedEvent, error) {
skip := 0
recordsAmount := len(events)
batchAmount := int(math.Ceil(float64(recordsAmount) / float64(maxBatchSize)))
batchFailedEvents := []types.FailedEvent{}
for i := 0; i < batchAmount; i++ {
lowerBound := skip
upperBound := skip + int(maxBatchSize)
if upperBound > recordsAmount {
upperBound = recordsAmount
}
batchEvents := events[lowerBound:upperBound]
skip += int(maxBatchSize)
failedEvents, err := batchFn(batchEvents)
if err != nil {
return batchFailedEvents, err
}
if len(failedEvents) > 0 {
batchFailedEvents = append(batchFailedEvents, failedEvents...)
}
}
return batchFailedEvents, nil
}