x-pack/lib/geoip_database_management/subscription.rb (68 lines of code) (raw):

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License; # you may not use this file except in compliance with the Elastic License. require "logstash/util/loggable" require "thread" require_relative "subscription_observer" require 'observer' require 'concurrent/atomic/reentrant_read_write_lock' module LogStash module GeoipDatabaseManagement ## # A Subscription is acquired with Manager#subscribe_database_path class Subscription include LogStash::Util::Loggable include Observable # @api internal ## # @param initial [DBInfo] # @param state [#release!] # @api private def initialize(initial, state=nil) @state = state @observable = true @value = initial @lock = Concurrent::ReentrantReadWriteLock.new end ## # @overload value(&consumer) # Yields the current DbInfo and prevents changes from occurring until control is returned. # @note: this is intended for short-lived locks ONLY, as blocking writes will prevent updates # from being observed by other subscribers. # @yield db_info yields current DBInfo and returns the result of the block # @yieldparam db_info [DbInfo] # @return [Object] the result of the provided block # @overload value() # Returns the current DBInfo immediately # @return [DbInfo] def value(&consumer) @lock.with_read_lock do return yield(@value) if block_given? @value end end ## # Register an observer that will observe the current value and each subsequent update and expire, # until Subscription#release! # # @note: interacting with this Subscription or the Manager in any way in the provided hooks is # not advised, as it may cause deadlocks. # @overload observe(observer) # @param observer [SubscriptionObserver] # @return [Subscription] # @overload observe(observer_spec) # @param observer_spec [Hash]: (@see SubscriptionObserver::coerce) # @return [Subscription] def observe(observer_spec) observer = SubscriptionObserver.coerce(observer_spec) @lock.with_write_lock do fail "Subscription has been released!" unless @observable observer.construct(@value) self.add_observer do |new_value| @lock.with_read_lock do if new_value.expired? observer.on_expire else observer.on_update(new_value) end end end end self end ## # Releases this subscription and all of its observers # from receiving additional notifications. def release! @lock.with_write_lock do @observable = false delete_observers @state&.release!(self) @state = nil end end ## # @api internal def notify(updated_value) write_lock_held = @lock.acquire_write_lock @value = updated_value # downgrade to read lock for notifications @lock.with_read_lock do write_lock_held = !@lock.release_write_lock self.changed self.notify_observers(updated_value) end ensure @lock.release_read_lock if write_lock_held end ## # @api private def add_observer(*args, &block) @lock.with_write_lock do if block_given? super(block, :call) else super(*args) end end end end end end