x-pack/spec/geoip_database_management/subscription_spec.rb (169 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # # or more contributor license agreements. Licensed under the Elastic License; # # you may not use this file except in compliance with the Elastic License. require 'geoip_database_management/subscription' describe LogStash::GeoipDatabaseManagement::Subscription, :aggregate_failures do let(:mock_state) { double("state", release!: nil) } let(:initial_value) { LogStash::GeoipDatabaseManagement::DbInfo::PENDING } subject(:subscription) { described_class.new(initial_value, mock_state) } context "#value" do context "blocking" do it 'yields the current value' do expect { |b| subscription.value(&b) }.to yield_with_args(initial_value) end it 'returns the result of the block' do return_value = Object.new expect(subscription.value { |_| return_value}).to equal return_value end context "under contention" do it 'allows many concurrent readers' do concurrency = 10 start_latch = Concurrent::CountDownLatch.new(concurrency) release_latch = Concurrent::CountDownLatch.new(concurrency) finish_latch = Concurrent::CountDownLatch.new(concurrency) max_concurrent = Concurrent::AtomicFixnum.new threads = concurrency.times.map do |idx| Thread.new do Thread.current.abort_on_exception = true start_latch.count_down start_latch.wait(2) || fail("threads failed to start") subscription.value do |db_info| max_concurrent.increment release_latch.count_down release_latch.wait(2) || fail("threads failed to concurrently lock value (#{max_concurrent})") end finish_latch.count_down finish_latch.wait(2) || fail("failed to release") end end # cleanup threads deadline = Time.now + 10 threads.each do |t| timeout_remaining = [deadline - Time.now, 0].max t.kill unless t.join(timeout_remaining) end expect(max_concurrent.value).to eq(concurrency) end # validates that #value with a block will prevent updates until control is returned. # sets up a sequence in which several readers get the initial value concurrently, # a writer contends for the lock and modifies the value, and subsequent readers get # the updated value. it 'read-write contention', aggregate_failures: true do pre_write_count = 3 post_write_count = 7 reader_count = pre_write_count + post_write_count readers_ready_latch = Concurrent::CountDownLatch.new(reader_count) writer_ready_event = Concurrent::Event.new pre_write_read_acquired_latch = Concurrent::CountDownLatch.new(pre_write_count) pre_write_read_released_latch = Concurrent::CountDownLatch.new(pre_write_count) pre_write_event = Concurrent::Event.new values = Queue.new threads = [] # pre-write: acquire multiple locks, then signal writer and give it # a chance to contend for the lock before releasing pre_write_count.times do |idx| threads << Thread.new do Thread.current.abort_on_exception = true readers_ready_latch.count_down writer_ready_event.wait(2) || fail("writer failed to become ready") subscription.value do |db_info| pre_write_read_acquired_latch.count_down values << db_info # wait until writer has signaled that it is about to try to write pre_write_event.wait(2) || fail("writer failed to begin action") sleep(1) # wait long enough to ensure contention # ensure that the other readers are free to begin pre_write_read_released_latch.count_down end end end # post-write: wait until _just_ before the pre-write readers release their lock, # ensuring we are queued after the writer's blocked write. post_write_count.times do |idx| threads << Thread.new do Thread.current.abort_on_exception = true readers_ready_latch.count_down pre_write_read_released_latch.wait(10) || fail("pre-write readers failed to finish") subscription.value do |db_info| values << db_info end end end # write: wait until the pre-write readers have acquired the lock # before performing the write. updated_db_info = LogStash::GeoipDatabaseManagement::DbInfo.new(path: "/path/to/db") threads << Thread.new do Thread.current.abort_on_exception = true writer_ready_event.set readers_ready_latch.wait(10) || fail("readers never became ready") pre_write_read_acquired_latch.wait(10) || fail("pre reads never acquired") pre_write_event.set subscription.notify(updated_db_info) end # cleanup threads deadline = Time.now + 10 threads.each do |t| timeout_remaining = [deadline - Time.now, 0].max t.kill unless t.join(timeout_remaining) end expect(values.size).to eq(pre_write_count + post_write_count) pre_write_count.times do expect(values.pop(true)).to equal initial_value end post_write_count.times do expect(values.pop(true)).to equal updated_db_info end expect(values).to be_empty end end end context "non-blocking" do it 'returns the current value' do expect(subscription.value).to equal initial_value end end end context '#release!' do it 'releases' do subscription.release! expect(mock_state).to have_received(:release!).with(subscription) end end context "#observe" do shared_examples "observation" do let!(:log) { Queue.new } it "observes construct, update, and expiry" do current_value = LogStash::GeoipDatabaseManagement::DbInfo.new(path: "/one/two") subscription.notify(current_value) expect(log).to be_empty subscription.observe(observer_spec) expect(log.size).to eq(1) expect(log.pop(true)).to eq([:construct, current_value]) updated_value = LogStash::GeoipDatabaseManagement::DbInfo.new(path: "/three/four") subscription.notify(updated_value) expect(log.size).to eq(1) expect(log.pop(true)).to eq([:on_update, updated_value]) expired_value = LogStash::GeoipDatabaseManagement::DbInfo::EXPIRED subscription.notify(expired_value) expect(log.size).to eq(1) expect(log.pop(true)).to eq([:on_expire]) another_updated_value = LogStash::GeoipDatabaseManagement::DbInfo.new(path: "/five/six") subscription.notify(another_updated_value) expect(log.size).to eq(1) expect(log.pop(true)).to eq([:on_update, another_updated_value]) end context 'when subscription was previously released' do before(:each) { subscription.release! } it 'prevents new observation' do expect { subscription.observe(observer_spec) }.to raise_exception(/released/) expect(log).to be_empty end end end context "when given a components hash" do let(:observer_spec) { { construct: ->(v) { log << [:construct, v]}, on_update: ->(v) { log << [:on_update, v]}, on_expire: ->( ) { log << [:on_expire] }, } } include_examples "observation" end context "when given an object that quacks like a SubscriptionObserver instance" do let(:observer_class) do Class.new do def initialize(log); @log = log; end def construct(v); @log << [:construct, v]; end def on_update(v); @log << [:on_update, v]; end def on_expire; @log << [:on_expire]; end end end let(:observer_spec) { observer_class.new(log) } include_examples "observation" end end end