func batchEvents()

in bus/event_bridge.go [85:114]


func batchEvents(events []types.Event, maxBatchSize uint, batchFn func([]types.Event) ([]types.FailedEvent, error)) ([]types.FailedEvent, error) {
	skip := 0
	recordsAmount := len(events)
	batchAmount := int(math.Ceil(float64(recordsAmount) / float64(maxBatchSize)))

	batchFailedEvents := []types.FailedEvent{}

	for i := 0; i < batchAmount; i++ {
		lowerBound := skip
		upperBound := skip + int(maxBatchSize)

		if upperBound > recordsAmount {
			upperBound = recordsAmount
		}

		batchEvents := events[lowerBound:upperBound]
		skip += int(maxBatchSize)

		failedEvents, err := batchFn(batchEvents)
		if err != nil {
			return batchFailedEvents, err
		}

		if len(failedEvents) > 0 {
			batchFailedEvents = append(batchFailedEvents, failedEvents...)
		}
	}

	return batchFailedEvents, nil
}