internal/offloading/simulation_bucket.go (111 lines of code) (raw):

package offloading import ( "context" "errors" "io" "sync" "time" "gocloud.dev/blob" ) var errSimulationCanceled = errors.New("canceled") // simulation defines the data used for intercepting or simulating method behavior. // If Delay is set, the method will introduce a delay of the specified duration before returning. // If Err is set, the method will return the specified error instead of the normal result. type simulation struct { Delay time.Duration Err error } // simulationBucket is a blob.Bucket with simulation setup. type simulationBucket struct { // simulationMap maps an object key to a list of simulations, defining how each key should behave. simulationMap map[string][]simulation // simulationSequence is a flattened version of simulationMap. // It is useful for functions that need to iterate over all simulations in the map, such as list operations. simulationSequence []simulation currentSimulationIndex int retryStat map[string]int mu sync.Mutex *blob.Bucket } func newSimulationBucket(bucket *blob.Bucket, s map[string][]simulation) (Bucket, error) { seq := make([]simulation, 0) for _, s := range s { seq = append(seq, s...) } m := &simulationBucket{ simulationMap: s, Bucket: bucket, retryStat: make(map[string]int), mu: sync.Mutex{}, currentSimulationIndex: 0, simulationSequence: seq, } return m, nil } func (r *simulationBucket) Download(ctx context.Context, key string, writer io.Writer, opts *blob.ReaderOptions) error { return r.simulate(ctx, key, func() error { return r.Bucket.Download(ctx, key, writer, opts) }) } func (r *simulationBucket) Upload(ctx context.Context, key string, reader io.Reader, opts *blob.WriterOptions) error { return r.simulate(ctx, key, func() error { return r.Bucket.Upload(ctx, key, reader, opts) }) } func (r *simulationBucket) Delete(ctx context.Context, key string) error { return r.simulate(ctx, key, func() error { return r.Bucket.Delete(ctx, key) }) } // interceptedList essentially wraps the Bucket's List function. // The difference is that it returns a listIteratorWrapper, which can inject simulation data into the blob.ListIterator. func (r *simulationBucket) interceptedList(opts *blob.ListOptions) *listIteratorWrapper { defer func() { r.currentSimulationIndex++ }() it := r.Bucket.List(opts) var currentSimulation *simulation if r.currentSimulationIndex >= len(r.simulationSequence) { currentSimulation = nil } else { currentSimulation = &r.simulationSequence[r.currentSimulationIndex] } return &listIteratorWrapper{ ListIterator: it, simulation: currentSimulation, } } // listIteratorWrapper wraps a blob.ListIterator and allows simulation data to be injected. // When Next is called, it prioritizes returning the simulation data if available. type listIteratorWrapper struct { simulation *simulation *blob.ListIterator } func (r *listIteratorWrapper) Next(ctx context.Context) (*blob.ListObject, error) { if r.simulation == nil { return r.ListIterator.Next(ctx) } timer := time.NewTimer(r.simulation.Delay) select { case <-ctx.Done(): return nil, errSimulationCanceled case <-timer.C: if r.simulation.Err != nil { return nil, r.simulation.Err } return r.ListIterator.Next(ctx) } } func (r *simulationBucket) simulate(ctx context.Context, key string, operation func() error) error { r.mu.Lock() simulations, ok := r.simulationMap[key] retryCount := r.retryStat[key] var sim simulation if ok && retryCount < len(simulations) { sim = simulations[retryCount] r.retryStat[key]++ } r.mu.Unlock() if sim.Delay > 0 { timer := time.NewTimer(sim.Delay) select { case <-ctx.Done(): timer.Stop() return errSimulationCanceled case <-timer.C: // Continue after delay } } if sim.Err != nil { return sim.Err } return operation() }