events.go (189 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
* Copyright (c) 2016, The Gocql authors,
* provided under the BSD-3-Clause License.
* See the NOTICE file distributed with this work for additional information.
*/
package gocql
import (
"net"
"sync"
"time"
)
type eventDebouncer struct {
name string
timer *time.Timer
mu sync.Mutex
events []frame
callback func([]frame)
quit chan struct{}
logger StdLogger
}
func newEventDebouncer(name string, eventHandler func([]frame), logger StdLogger) *eventDebouncer {
e := &eventDebouncer{
name: name,
quit: make(chan struct{}),
timer: time.NewTimer(eventDebounceTime),
callback: eventHandler,
logger: logger,
}
e.timer.Stop()
go e.flusher()
return e
}
func (e *eventDebouncer) stop() {
e.quit <- struct{}{} // sync with flusher
close(e.quit)
}
func (e *eventDebouncer) flusher() {
for {
select {
case <-e.timer.C:
e.mu.Lock()
e.flush()
e.mu.Unlock()
case <-e.quit:
return
}
}
}
const (
eventBufferSize = 1000
eventDebounceTime = 1 * time.Second
)
// flush must be called with mu locked
func (e *eventDebouncer) flush() {
if len(e.events) == 0 {
return
}
// if the flush interval is faster than the callback then we will end up calling
// the callback multiple times, probably a bad idea. In this case we could drop
// frames?
go e.callback(e.events)
e.events = make([]frame, 0, eventBufferSize)
}
func (e *eventDebouncer) debounce(frame frame) {
e.mu.Lock()
e.timer.Reset(eventDebounceTime)
// TODO: probably need a warning to track if this threshold is too low
if len(e.events) < eventBufferSize {
e.events = append(e.events, frame)
} else {
e.logger.Printf("%s: buffer full, dropping event frame: %s", e.name, frame)
}
e.mu.Unlock()
}
func (s *Session) handleEvent(framer *framer) {
frame, err := framer.parseFrame()
if err != nil {
s.logger.Printf("gocql: unable to parse event frame: %v\n", err)
return
}
if gocqlDebug {
s.logger.Printf("gocql: handling frame: %v\n", frame)
}
switch f := frame.(type) {
case *schemaChangeKeyspace, *schemaChangeFunction,
*schemaChangeTable, *schemaChangeAggregate, *schemaChangeType:
s.schemaEvents.debounce(frame)
case *topologyChangeEventFrame, *statusChangeEventFrame:
s.nodeEvents.debounce(frame)
default:
s.logger.Printf("gocql: invalid event frame (%T): %v\n", f, f)
}
}
func (s *Session) handleSchemaEvent(frames []frame) {
// TODO: debounce events
for _, frame := range frames {
switch f := frame.(type) {
case *schemaChangeKeyspace:
s.schemaDescriber.clearSchema(f.keyspace)
s.handleKeyspaceChange(f.keyspace, f.change)
case *schemaChangeTable:
s.schemaDescriber.clearSchema(f.keyspace)
case *schemaChangeAggregate:
s.schemaDescriber.clearSchema(f.keyspace)
case *schemaChangeFunction:
s.schemaDescriber.clearSchema(f.keyspace)
case *schemaChangeType:
s.schemaDescriber.clearSchema(f.keyspace)
}
}
}
func (s *Session) handleKeyspaceChange(keyspace, change string) {
s.control.awaitSchemaAgreement()
s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace, Change: change})
}
// handleNodeEvent handles inbound status and topology change events.
//
// Status events are debounced by host IP; only the latest event is processed.
//
// Topology events are debounced by performing a single full topology refresh
// whenever any topology event comes in.
//
// Processing topology change events before status change events ensures
// that a NEW_NODE event is not dropped in favor of a newer UP event (which
// would itself be dropped/ignored, as the node is not yet known).
func (s *Session) handleNodeEvent(frames []frame) {
type nodeEvent struct {
change string
host net.IP
port int
}
topologyEventReceived := false
// status change events
sEvents := make(map[string]*nodeEvent)
for _, frame := range frames {
switch f := frame.(type) {
case *topologyChangeEventFrame:
topologyEventReceived = true
case *statusChangeEventFrame:
event, ok := sEvents[f.host.String()]
if !ok {
event = &nodeEvent{change: f.change, host: f.host, port: f.port}
sEvents[f.host.String()] = event
}
event.change = f.change
}
}
if topologyEventReceived && !s.cfg.Events.DisableTopologyEvents {
s.debounceRingRefresh()
}
for _, f := range sEvents {
if gocqlDebug {
s.logger.Printf("gocql: dispatching status change event: %+v\n", f)
}
// ignore events we received if they were disabled
// see https://github.com/apache/cassandra-gocql-driver/issues/1591
switch f.change {
case "UP":
if !s.cfg.Events.DisableNodeStatusEvents {
s.handleNodeUp(f.host, f.port)
}
case "DOWN":
if !s.cfg.Events.DisableNodeStatusEvents {
s.handleNodeDown(f.host, f.port)
}
}
}
}
func (s *Session) handleNodeUp(eventIp net.IP, eventPort int) {
if gocqlDebug {
s.logger.Printf("gocql: Session.handleNodeUp: %s:%d\n", eventIp.String(), eventPort)
}
host, ok := s.ring.getHostByIP(eventIp.String())
if !ok {
s.debounceRingRefresh()
return
}
if s.cfg.filterHost(host) {
return
}
if d := host.Version().nodeUpDelay(); d > 0 {
time.Sleep(d)
}
s.startPoolFill(host)
}
func (s *Session) startPoolFill(host *HostInfo) {
// we let the pool call handleNodeConnected to change the host state
s.pool.addHost(host)
s.policy.AddHost(host)
}
func (s *Session) handleNodeConnected(host *HostInfo) {
if gocqlDebug {
s.logger.Printf("gocql: Session.handleNodeConnected: %s:%d\n", host.ConnectAddress(), host.Port())
}
host.setState(NodeUp)
if !s.cfg.filterHost(host) {
s.policy.HostUp(host)
}
}
func (s *Session) handleNodeDown(ip net.IP, port int) {
if gocqlDebug {
s.logger.Printf("gocql: Session.handleNodeDown: %s:%d\n", ip.String(), port)
}
host, ok := s.ring.getHostByIP(ip.String())
if ok {
host.setState(NodeDown)
if s.cfg.filterHost(host) {
return
}
s.policy.HostDown(host)
hostID := host.HostID()
s.pool.removeHost(hostID)
}
}