common/speed.go (74 lines of code) (raw):

package utils import ( "time" "math" LOG "github.com/vinllen/log4go" ) type Qos struct { Limit int64 // qps, <= 0 means disable limit Ticket int64 // one tick size, default is 1 bucket chan struct{} // bucket channel addr *int64 // periodically check whether the address value is equal to Limit, and update if not. close bool // channel is closed? prevLimit int64 // previous address limit } func StartQoS(limit, ticket int64, addr *int64) *Qos { if ticket <= 0 { // illegal return nil } q := new(Qos) q.Limit = limit q.Ticket = ticket q.addr = addr q.bucket = make(chan struct{}, limit) go q.timer() return q } func (q *Qos) FetchBucket() { for q.Limit > 0 { // the old bucket channel maybe release, so we need to retry once timeout select { case <-q.bucket: return case <-time.After(time.Second * 1): break } } } func (q *Qos) resizeLimit() { // we must empty previous channel first to avoid memory leak FOR: for { select { case <-q.bucket: default: // break if bucket if empty break FOR } } LOG.Info("clear old channel, set new bucket size[%v]", q.Limit) q.bucket = make(chan struct{}, q.Limit) } func (q *Qos) timer() { var i int64 for range time.NewTicker(1 * time.Second).C { if q.close { return } if *q.addr != q.prevLimit { LOG.Info("try to resize bucket channel from %v to %v, bucket size[%v], ticket[%v]", q.prevLimit, *q.addr, q.Limit, q.Ticket) q.prevLimit = *q.addr // 0 is ok q.Limit = int64(math.Ceil(float64(*q.addr) / float64(q.Ticket))) q.resizeLimit() } INJECT: for i = 0; i < q.Limit; i++ { select { case q.bucket <- struct{}{}: default: // break if bucket if full break INJECT } } } } func (q *Qos) Close() { q.close = true }