proxy/protocol/dubbo/utils/msgqueue.go (98 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package util import ( "container/list" "sync" ) const ( MaxBufferMsg = 65535 Actived = 0 Deactived = 1 ) //MsgQueue thread safe queue type MsgQueue struct { msgList *list.List mtx *sync.Mutex msgCount int maxMsgNum int state int notEmptyCond *sync.Cond notFullCond *sync.Cond } //NewMsgQueue is a function which initializes msgqueue value func NewMsgQueue() *MsgQueue { tmp := new(MsgQueue) tmp.msgList = list.New() tmp.mtx = new(sync.Mutex) tmp.msgCount = 0 tmp.maxMsgNum = MaxBufferMsg tmp.state = Actived tmp.notEmptyCond = sync.NewCond(tmp.mtx) tmp.notFullCond = sync.NewCond(tmp.mtx) return tmp } //Enqueue is method which enqueues message in queue func (this *MsgQueue) Enqueue(msg interface{}) error { this.mtx.Lock() defer this.mtx.Unlock() if this.state == Deactived { return &BaseError{"Queue is deactive"} } if this.waitNotFullCond() == -1 { return &BaseError{"Enqueue time out"} } this.msgList.PushFront(msg) this.msgCount++ this.notEmptyCond.Signal() return nil } //Dequeue is a method which dequeues message from queue func (this *MsgQueue) Dequeue() (interface{}, error) { this.mtx.Lock() defer this.mtx.Unlock() if this.waitNotEmptyCond() == -1 { return nil, &BaseError{"Queue is deactive"} } iter := this.msgList.Back() v := iter.Value this.msgList.Remove(iter) this.msgCount-- this.notFullCond.Signal() return v, nil } //isEmpty is a method which checks whether queue is empty func (this *MsgQueue) isEmpty() bool { if this.msgCount == 0 { return true } else { return false } } //isFull is a method which checks whether queue is full func (this *MsgQueue) isFull() bool { if this.msgCount >= this.maxMsgNum { return true } else { return false } } //waitNotFullCond is a method which waits if queue is full func (this *MsgQueue) waitNotFullCond() int { var result = 0 if this.isFull() { this.notFullCond.Wait() if this.state != Actived { result = -1 return result } } return result } //Deavtive is a method func (this *MsgQueue) Deavtive() { this.state = Deactived this.notEmptyCond.Broadcast() this.notFullCond.Broadcast() } //waitNotEmptyCond is a method func (this *MsgQueue) waitNotEmptyCond() int { var result = 0 if this.isEmpty() { this.notEmptyCond.Wait() if this.state != Actived { result = -1 return result } } return result }