internal/bundleuri/strategy_occurences.go (261 lines of code) (raw):
package bundleuri
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
// evaluateRequest is a helper struct to pass along multiple
// related fields as one item into the `evaluateQueue`
type evaluateRequest struct {
ctx context.Context
repo *localrepo.Repo
time time.Time
cb func(ctx context.Context, repo *localrepo.Repo) error
key string
}
func newEvaluateRequest(ctx context.Context, repo *localrepo.Repo, t time.Time, cb StrategyCallbackFn) (evaluateRequest, error) {
repoProto, ok := repo.Repository.(*gitalypb.Repository)
if !ok {
return evaluateRequest{}, errors.New("expecting repo.Repository to be of type *gitalypb.Repository")
}
// A transaction re-writes the relative path to include the
// snapshot path. Using the snapshot path will not work here
// because we need a common key for each repository, not for
// each snapshot.
if tx := storage.ExtractTransaction(ctx); tx != nil {
repoProto = tx.OriginalRepository(repoProto)
}
return evaluateRequest{
ctx: ctx,
repo: repo,
time: t,
cb: cb,
key: bundleRelativePath(repoProto, defaultBundle),
}, nil
}
// repositoryState holds the state of a repository's occurrences
// for the strategy
type repositoryState struct {
// occurrences contains a list of time instance that each represents
// a moment in time when a generation request was received. The interval
// between two consecutive instance is the interval between two requests.
occurrences []time.Time
// lastGenerate holds the last time a bundle was generated
lastGenerate time.Time
// generating indicate if a bundle is being generated
// for this state
generating bool
}
// newRepositoryState creates a new state instance
func newRepositoryState(occurrences int) *repositoryState {
return &repositoryState{
occurrences: make([]time.Time, occurrences),
}
}
// OccurrenceStrategy is a strategy type that generates a bundle for a given repository
// only if n occurrences happened during a pre-determined interval. To avoid generating
// too much of the same bundle, it also keeps track of the last time a bundle was
// generated, and make sure to never generate a bundle that is newer than `maxBundleAge`.
type OccurrenceStrategy struct {
// logger is the logger instance used.
logger log.Logger
// interval is the interval in which n `occurrences`
// must occur for a bundle generation to be triggered.
interval time.Duration
// threshold is the threshold of occurrences
// for a repository within `interval`.
threshold int
// maxConcurrent is the maximum of bundle generation that
// can occur at the same time.
maxConcurrent int
// maxBundleAge is the age a bundle can reach before generating
// a new one.
maxBundleAge time.Duration
// evaluateQueue is the channel that holds all requests
// to the strategy to be evaluated.
evaluateQueue chan evaluateRequest
// generateQueue is the channel that holds all requests
// to generate a bundle.
generateQueue chan evaluateRequest
// states holds the state of occurrences for each repository.
state map[string]*repositoryState
// stateMu is a lock guarding access to the state, because
// generation happens in parallel (limited by maxConcurrent)
stateMu sync.Mutex
// the ticker used for controlling the garbage collection
gcTicker helper.Ticker
// now returns the current time
now func() time.Time
// takes a request as parameter and returns the time it
// took to generate a bundle for this request
lastGeneratedBundle func(request evaluateRequest) time.Time
// done is a hook that is called on 2 occasions:
// if a bundle won't be generated: must be called with false
// if a bundle will be generated: must be called with true
// it is used for easier monitoring
done func(generating bool)
// wg is the wait group used to control the various workers
// started by this strategy.
wg sync.WaitGroup
// doneCtx is used to explicitly close all workers of the strategy
doneCtx context.Context
// doneCancel is the cancel function associated with doneCtx
doneCancel context.CancelFunc
stateLenGauge prometheus.Gauge
}
// NewOccurrenceStrategy creates a new OccurrenceStrategy.
// It initializes the internal state of the strategy.
// - threshold: The amount of occurrences within an interval that should trigger a bundle
// generation. A value of 1 or below indicate that a bundle should be generated everytime.
// - interval: the interval in which `n` occurrences must happen to trigger a bundle generation
// - maxBundleAge: the duration within which an already existing bundle should not be overwritten
// by a new one
func NewOccurrenceStrategy(logger log.Logger, threshold int, interval time.Duration, maxConcurrent int, maxBundleAge time.Duration) (*OccurrenceStrategy, error) {
if threshold < 2 {
return nil, fmt.Errorf("threshold must be >= 2; it is (%d)", threshold)
}
if maxConcurrent < 1 {
maxConcurrent = 1
}
s := &OccurrenceStrategy{
logger: logger,
threshold: threshold,
interval: interval,
maxConcurrent: maxConcurrent,
maxBundleAge: maxBundleAge,
evaluateQueue: make(chan evaluateRequest, 1),
generateQueue: make(chan evaluateRequest, 1),
state: make(map[string]*repositoryState),
gcTicker: helper.NewTimerTicker(time.Minute),
now: time.Now,
lastGeneratedBundle: func(request evaluateRequest) time.Time {
return time.Now()
},
done: func(_ bool) {},
stateLenGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "gitaly_bundle_strategy_occurrence_state_len",
Help: "the number of items in the state",
},
),
}
return s, nil
}
// Describe is used to describe Prometheus metrics.
func (s *OccurrenceStrategy) Describe(descs chan<- *prometheus.Desc) {
prometheus.DescribeByCollect(s, descs)
}
// Collect is used to collect Prometheus metrics.
func (s *OccurrenceStrategy) Collect(metrics chan<- prometheus.Metric) {
s.stateLenGauge.Collect(metrics)
}
// Start must be called before calling `Evaluate`. It starts the listener
// on the evaluateQueue in order to process the evaluateRequests.
func (s *OccurrenceStrategy) Start(ctx context.Context) (stop func()) {
// create a new cancel context from the passed one
// that way the child goroutines can be stopped
// in 2 ways:
// 1. cancelling the `ctx`
// 2. calling the returned `stop` function
cctx, cancel := context.WithCancel(ctx)
s.doneCtx = cctx
s.doneCancel = cancel
workerFn := []func(ctx context.Context){
s.startEvaluationWorker,
s.startGenerationWorkers,
s.startStateGc,
}
for _, fn := range workerFn {
s.wg.Add(1)
go func() {
defer s.wg.Done()
fn(s.doneCtx)
}()
}
return func() {
s.doneCancel()
s.wg.Wait()
}
}
// Evaluate prepares an evaluateRequest and sends it to the evaluateQueue for further processing.
func (s *OccurrenceStrategy) Evaluate(ctx context.Context, repo *localrepo.Repo, cb StrategyCallbackFn) error {
req, err := newEvaluateRequest(ctx, repo, s.now(), cb)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case s.evaluateQueue <- req:
}
return nil
}
// process processes the request. It evaluates the occurrences, and if a bundle
// should be generated based on the occurrences, it checks if a bundle has recently
// been generated or not for this repository. It generates one if it needs to.
func (s *OccurrenceStrategy) process(request evaluateRequest) {
if s.evaluate(request) {
select {
// if a worker is ready to pick a job from the channel then send it through
case s.generateQueue <- request:
s.logger.
WithField("gl_project_path", request.repo.GetGlProjectPath()).
Info("bundle-uri strategy: request added to generate queue")
// else abort the generation and reverse the `generating` boolean
default:
s.logger.
WithField("gl_project_path", request.repo.GetGlProjectPath()).
Info("bundle-uri strategy: generate queue full; ignoring request")
s.stateMu.Lock()
defer s.stateMu.Unlock()
state := s.loadState(request)
state.generating = false
s.done(false)
}
} else {
// send false since we are not generating a bundle
s.done(false)
}
}
// evaluate is the core logic of the strategy. This is where the decision is made to
// generate a bundle or not. It returns true if a bundle must be generated, and false
// otherwise. The way the strategy works is as follows:
//
// 1. For each repository, the state holds a slice of n integers (n being the occurrences parameter).
// 2. For each request made to `shouldGenerate`, (which generates an occurrence) it calculates
// the difference between the first integer of the slice and the last one.
// 3. If the difference is less than or equal to the interval, it means n occurrences happened
// during that said interval, and thus a bundle must be generated.
//
// Example:
// Given an occurrences of 5 and an interval of 20 seconds.
// Given the following state, where each number represents the number of seconds elapsed since
// time T0 (newer items on left side):
// [19, 17, 15, 3, 0]
// We evaluate the difference between the first and the last item: 19-0 = 19.
// We have an interval of 19 seconds for the 5 occurrences in the slice. Since 19 is
// smaller than our interval of 20, we have the condition to generate a bundle.
//
// For each new occurrences, the time, represented as an integer, is inserted at index 0
// and the last item of the slice is discarded. This is to ensure the slice always have
// the invariant len(state) == occurrences.
func (s *OccurrenceStrategy) evaluate(request evaluateRequest) bool {
s.stateMu.Lock()
defer s.stateMu.Unlock()
state := s.loadState(request)
// Here we are shifting all values in the slice
// (state.occurrences) from one position to the end
// in order to add the new value at the beginning.
for i := s.threshold - 1; i > 0; i-- {
state.occurrences[i] = state.occurrences[i-1]
}
state.occurrences[0] = request.time
oldestOccurrence := state.occurrences[len(state.occurrences)-1]
newestOccurrence := state.occurrences[0]
// if the elapsed time between the newest and oldest occurrences
// is not within the interval, we do not generate.
if newestOccurrence.Sub(oldestOccurrence) > s.interval {
return false
}
// if the last bundle generated is newer than the maxBundleAge, then
// we do not generate again
if state.lastGenerate.Add(s.maxBundleAge).After(request.time) {
return false
}
if !state.generating {
state.generating = true
return true
}
return false
}
func (s *OccurrenceStrategy) doneGenerating(request evaluateRequest, success bool) {
s.stateMu.Lock()
defer s.stateMu.Unlock()
state := s.loadState(request)
state.generating = false
if success {
state.lastGenerate = s.lastGeneratedBundle(request)
}
s.done(success)
}
// loadState get the state for the given repository from the state. If an entry does not exist
// for the given repository, it creates one.
func (s *OccurrenceStrategy) loadState(request evaluateRequest) *repositoryState {
state, ok := s.state[request.key]
if ok {
return state
}
ns := newRepositoryState(s.threshold)
s.state[request.key] = ns
// set gauge metrics
s.stateLenGauge.Set(float64(len(s.state)))
return ns
}
// startStateGc is the garbage collector of the gState. At every n interval, it loops over the
// state and removes all states for which the bundle is older than maxBundleAge
func (s *OccurrenceStrategy) startStateGc(ctx context.Context) {
defer s.gcTicker.Stop()
for {
s.gcTicker.Reset()
select {
case <-ctx.Done():
return
case <-s.gcTicker.C():
s.stateMu.Lock()
for repo, state := range s.state {
now := s.now()
// if the bundle is still not old enough, we keep the state
// to avoid it being re-generated.
if now.Sub(state.lastGenerate) < s.maxBundleAge {
continue
}
// if the interval between:
// 1) the newest occurrence in the state
// 2) the current time
// is longer than s.interval, then it means that for the next
// s.occurrences, a bundle won't be generated because the interval
// newest and oldest occurrence will always exceed the s.interval.
newestOccurrence := state.occurrences[0]
if now.Sub(newestOccurrence) < s.interval {
continue
}
// if a bundle is being generated for this state
// we do not garbage collect it
if state.generating {
continue
}
delete(s.state, repo)
}
s.stateLenGauge.Set(float64(len(s.state)))
s.stateMu.Unlock()
}
}
}
// startGenerationWorkers starts n goroutines, where n == maxConcurrent. Each goroutine listens
// on the generateQueue channel to process any bundle generation request. This pattern is to
// limit the number of concurrent bundles that can be generated at the same time.
func (s *OccurrenceStrategy) startGenerationWorkers(ctx context.Context) {
wg := sync.WaitGroup{}
for i := 0; i < s.maxConcurrent; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case request := <-s.generateQueue:
err := request.cb(request.ctx, request.repo)
if err != nil {
s.logger.WithError(err).Error("failed to generate bundle")
s.doneGenerating(request, false)
} else {
s.doneGenerating(request, true)
}
}
}
}()
}
wg.Wait()
}
// startEvaluationWorker starts 1 single goroutine to listen on the evaluateQueue
// to process the evaluationRequest.
func (s *OccurrenceStrategy) startEvaluationWorker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case request := <-s.evaluateQueue:
s.process(request)
}
}
}