x-pack/lib/geoip_database_management/subscription_observer.rb (53 lines of code) (raw):
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
require "logstash/util/loggable"
require "thread"
require 'observer'
require 'concurrent/atomic/reentrant_read_write_lock'
module LogStash module GeoipDatabaseManagement
##
# Provide a SubscriptionObserver or a SubscriptionObserver::coerce-able object
# to Subscription#observe to use the current value and observe changes to the
# subscription's state
#
# @api public
module SubscriptionObserver
##
# Coerce an object into an `SubscriptionObserver`, if necessary
# @overload coerce(observer)
# @param observer [SubscriptionObserver]: an object that "quacks like" a `SubscriptionObserver`
# as defined by `SubscriptionObserver::===`
# @return [SubscriptionObserver]
# @overload coerce(construct:, :on_update, :on_expire)
# @param construct [Proc(DbInfo)->void]: a single-arity Proc that will receive the current
# DbInfo at the beginning of observation
# @param on_update [Proc(DbInfo)->void]: a single-arity Proc that will receive notifications
# of each subsequent `DBInfo`
# @param on_expire [Proc()->void]: a zero-arity Proc that will receive notifications of the
# current value expiring.
# @return [SubscriptionObserver::Proxy]
# @api public
def self.coerce(observer_spec)
case observer_spec
when SubscriptionObserver then observer_spec
when Hash then Proxy.new(**observer_spec)
else
fail ArgumentError, "Could not make a SubscriptionObserver from #{observer_spec.inspect}"
end
end
##
# Quacks-like check, to simplify consuming from Java where the ruby module can't be
# directly mixed into a Java class
def self.===(candidate)
return true if super
return false unless candidate.respond_to?(:construct)
return false unless candidate.respond_to?(:on_update)
return false unless candidate.respond_to?(:on_expire)
true
end
##
# Observe the value at observer's construction, before any state-change notifications are fired
def construct(initial_value)
fail NotImplementedError
end
##
# Observe an update notice, after construction is complete
def on_update(updated_value)
fail NotImplementedError
end
##
# Observe an expiry notice, after construction is complete
def on_expire
fail NotImplementedError
end
##
# @api internal
# @see SubscriptionObserver#coerce
class Proxy
include SubscriptionObserver
def initialize(construct:, on_update:, on_expire:)
fail ArgumentError unless construct.respond_to?(:call) && construct.arity == 1
fail ArgumentError unless on_update.respond_to?(:call) && on_update.arity == 1
fail ArgumentError unless on_expire.respond_to?(:call) && on_expire.arity == 0
@construct = construct
@on_update = on_update
@on_expire = on_expire
end
def construct(initial_value)
@construct.call(initial_value)
end
def on_update(updated_value)
@on_update.call(updated_value)
end
def on_expire
@on_expire.call
end
end
private_constant :Proxy
end
end end