internal/streams/streams.go (104 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /* * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40 * Copyright (c) 2016, The Gocql authors, * provided under the BSD-3-Clause License. * See the NOTICE file distributed with this work for additional information. */ package streams import ( "math" "strconv" "sync/atomic" ) const bucketBits = 64 // IDGenerator tracks and allocates streams which are in use. type IDGenerator struct { NumStreams int inuseStreams int32 numBuckets uint32 // streams is a bitset where each bit represents a stream, a 1 implies in use streams []uint64 offset uint32 } func New(protocol int) *IDGenerator { maxStreams := 128 if protocol > 2 { maxStreams = 32768 } buckets := maxStreams / 64 // reserve stream 0 streams := make([]uint64, buckets) streams[0] = 1 << 63 return &IDGenerator{ NumStreams: maxStreams, streams: streams, numBuckets: uint32(buckets), offset: uint32(buckets) - 1, } } func streamFromBucket(bucket, streamInBucket int) int { return (bucket * bucketBits) + streamInBucket } func (s *IDGenerator) GetStream() (int, bool) { // based closely on the java-driver stream ID generator // avoid false sharing subsequent requests. offset := atomic.LoadUint32(&s.offset) for !atomic.CompareAndSwapUint32(&s.offset, offset, (offset+1)%s.numBuckets) { offset = atomic.LoadUint32(&s.offset) } offset = (offset + 1) % s.numBuckets for i := uint32(0); i < s.numBuckets; i++ { pos := int((i + offset) % s.numBuckets) bucket := atomic.LoadUint64(&s.streams[pos]) if bucket == math.MaxUint64 { // all streams in use continue } for j := 0; j < bucketBits; j++ { mask := uint64(1 << streamOffset(j)) for bucket&mask == 0 { if atomic.CompareAndSwapUint64(&s.streams[pos], bucket, bucket|mask) { atomic.AddInt32(&s.inuseStreams, 1) return streamFromBucket(int(pos), j), true } bucket = atomic.LoadUint64(&s.streams[pos]) } } } return 0, false } func bitfmt(b uint64) string { return strconv.FormatUint(b, 16) } // returns the bucket offset of a given stream func bucketOffset(i int) int { return i / bucketBits } func streamOffset(stream int) uint64 { return bucketBits - uint64(stream%bucketBits) - 1 } func isSet(bits uint64, stream int) bool { return bits>>streamOffset(stream)&1 == 1 } func (s *IDGenerator) isSet(stream int) bool { bits := atomic.LoadUint64(&s.streams[bucketOffset(stream)]) return isSet(bits, stream) } func (s *IDGenerator) String() string { size := s.numBuckets * (bucketBits + 1) buf := make([]byte, 0, size) for i := 0; i < int(s.numBuckets); i++ { bits := atomic.LoadUint64(&s.streams[i]) buf = append(buf, bitfmt(bits)...) buf = append(buf, ' ') } return string(buf[: size-1 : size-1]) } func (s *IDGenerator) Clear(stream int) (inuse bool) { offset := bucketOffset(stream) bucket := atomic.LoadUint64(&s.streams[offset]) mask := uint64(1) << streamOffset(stream) if bucket&mask != mask { // already cleared return false } for !atomic.CompareAndSwapUint64(&s.streams[offset], bucket, bucket & ^mask) { bucket = atomic.LoadUint64(&s.streams[offset]) if bucket&mask != mask { // already cleared return false } } // TODO: make this account for 0 stream being reserved if atomic.AddInt32(&s.inuseStreams, -1) < 0 { // TODO(zariel): remove this panic("negative streams inuse") } return true } func (s *IDGenerator) Available() int { return s.NumStreams - int(atomic.LoadInt32(&s.inuseStreams)) - 1 }