spec/connectors/mongodb/connector_spec.rb (491 lines of code) (raw):
# frozen_string_literal: true
require 'connectors/mongodb/connector'
require 'core/filtering/simple_rules/simple_rule'
require 'hashie/mash'
require 'spec_helper'
describe Connectors::MongoDB::Connector do
subject { described_class.new(configuration: configuration, job_description: job_description) }
let(:filter_transformers) {
{
'rules' => [],
'advanced_snippet' => []
}
}
let(:configuration) do
{
:host => {
:label => 'Server Hostname',
:value => mongodb_host
},
:database => {
:label => 'Database',
:value => mongodb_database
},
:collection => {
:label => 'Collection',
:value => mongodb_collection
},
:user => {
:label => 'Username',
:value => mongodb_username
},
:password => {
:label => 'Password',
:value => mongodb_password
},
:direct_connection => {
:label => 'Direct connection? (true/false)',
:value => direct_connection
},
:filtering => {
:active => filtering
}
}
end
let(:rules) do
[
{
'id' => 'test',
'field' => 'name',
'rule' => Core::Filtering::SimpleRule::Rule::EQUALS,
'policy' => Core::Filtering::SimpleRule::Policy::INCLUDE,
'value' => 'apple'
}
]
end
let(:pipeline) {
[
{
:$group” => {
:_id => { :$dateToString => { :format => '%Y-%m-%d', :date => '$date' } },
:totalOrderValue => { :$sum => { :$multiply => %w[$price $quantity] } },
:averageOrderQuantity => { :$avg => '$quantity' }
}
}
]
}
let(:filter) {
{
:$text => {
:$search => 'garden',
:$caseSensitive => false
}
}
}
let(:options) {
{
:skip => 10,
:limit => 1000
}
}
let(:find) {
{
:filter => filter,
:options => options
}
}
let(:aggregate) {
{
:pipeline => pipeline,
:options => options
}
}
let(:advanced_snippet) {
{
:find => find
}
}
let(:filtering) {
{
:rules => rules,
:advanced_snippet => advanced_snippet
}
}
let(:job_description) { double }
let(:mongodb_host) { '127.0.0.1:27027' }
let(:mongodb_database) { 'sample-database' }
let(:mongodb_collection) { 'sample-collection' }
let(:mongodb_username) { nil }
let(:mongodb_password) { nil }
let(:direct_connection) { 'false' }
let(:mongo_client) { double }
let(:actual_collection_name) { 'sample-collection' }
let(:actual_collection) { double }
let(:actual_collection_names) { [actual_collection_name] }
let(:actual_collection_data) { [] }
let(:mongo_collection_cursor) { double }
let(:mongo_collection_data) { [] }
let(:actual_database) { double }
let(:actual_database_names) { ['sample-database'] }
before(:each) do
# transformers are tested in their own specs (multiple transformers and their interaction should be tested here later)
allow(described_class).to receive(:filter_transformers).and_return(filter_transformers)
allow(job_description).to receive(:dup).and_return(job_description)
allow(job_description).to receive(:configuration).and_return(configuration)
allow(job_description).to receive(:filtering).and_return(filtering)
allow(Mongo::Client).to receive(:new).and_yield(mongo_client)
allow(mongo_client).to receive(:collections).and_return([Hashie::Mash.new({ :name => actual_collection_name })])
allow(mongo_client).to receive(:database).and_return(actual_database)
allow(mongo_client).to receive(:database_names).and_return(actual_database_names)
allow(mongo_client).to receive(:[]).with(mongodb_collection).and_return(actual_collection)
allow(mongo_client).to receive(:with).and_return(mongo_client)
allow(mongo_client).to receive(:close)
allow(actual_database).to receive(:collection_names).and_return(actual_collection_names)
allow(actual_collection).to receive(:find).and_return(mongo_collection_cursor)
allow(actual_collection).to receive(:aggregate).and_return(mongo_collection_cursor)
allow(mongo_collection_cursor).to receive(:skip).and_return(mongo_collection_cursor)
allow(mongo_collection_cursor).to receive(:limit).and_return(mongo_collection_data)
end
it_behaves_like 'a connector'
shared_examples_for 'handles auth' do
context 'when username and password are provided' do
let(:mongodb_username) { 'admin' }
let(:mongodb_password) { 'some-password' }
it 'sets client to use basic auth' do
expect(Mongo::Client).to receive(:new).with(anything, hash_including(:user => mongodb_username, :password => mongodb_password))
do_test
end
end
context 'when no username and password are provided' do
it 'does not set client to use basic auth' do
expect(Mongo::Client).to_not receive(:new).with(anything, hash_including(:user => mongodb_username, :password => mongodb_password))
do_test
end
end
end
shared_examples_for 'validates direct_connection' do
%w[true false].each do |value|
context "when direct_connection is #{value}" do
let(:direct_connection) { value }
it 'does not throw error' do
expect { do_test }.to_not raise_error
end
end
end
context 'when direct_connection is not a boolean' do
let(:direct_connection) { 'foobar' }
it 'throws error' do
expect { do_test }.to raise_error
end
end
end
describe '#is_healthy?' do
it_behaves_like 'handles auth' do
let(:do_test) { subject.is_healthy? }
end
it 'instantiates a mongodb client' do
expect(Mongo::Client).to receive(:new).with(mongodb_host, hash_including(:database => mongodb_database))
subject.is_healthy?
end
end
describe '#yield_documents' do
it_behaves_like 'handles auth' do
let(:do_test) { subject.yield_documents { |doc|; } }
end
it_behaves_like 'validates direct_connection' do
let(:do_test) { subject.yield_documents { |doc|; } }
end
context 'when database is not found' do
let(:mongodb_database) { 'non-existing-database' }
it 'does raise' do
expect { |b| subject.yield_documents(&b) }.to raise_error(anything)
end
end
context 'when collection is not found' do
let(:mongodb_collection) { 'non-existing-collection' }
it 'does raise' do
expect { |b| subject.yield_documents(&b) }.to raise_error(anything)
end
end
context 'when collection is found' do
context 'when data is distributed in multiple pages' do
let(:page_size) { 3 }
let(:doc2) { { '_id' => '2', 'more' => { 'nested' => 'data' } } }
let(:first_page_data) do
[
{ '_id' => '1', 'some' => { 'nested' => 'data' } },
doc2,
{ '_id' => '167', 'nothing' => nil }
]
end
let(:second_page_data) do
[
{ '_id' => 'last' }
]
end
let(:third_page_data) do
[]
end
let(:all_data) { first_page_data + second_page_data + third_page_data }
let(:second_page_cursor) { double }
let(:third_page_cursor) { double }
let(:options) {
nil
}
before(:each) do
stub_const('Connectors::MongoDB::Connector::PAGE_SIZE', page_size)
allow(mongo_collection_cursor).to receive(:skip).with(0).and_return(mongo_collection_cursor)
allow(mongo_collection_cursor).to receive(:limit).and_return(first_page_data)
allow(mongo_collection_cursor).to receive(:skip).with(page_size).and_return(second_page_cursor)
allow(second_page_cursor).to receive(:limit).and_return(second_page_data)
allow(mongo_collection_cursor).to receive(:skip).with(page_size * 2).and_return(third_page_cursor)
allow(third_page_cursor).to receive(:limit).and_return(third_page_data)
end
it 'fetches each page' do
# a bit weird test, but I did not figure out to do a better job ensuring that each page was fetched
expect(mongo_collection_cursor).to receive(:limit).once
expect(second_page_cursor).to receive(:limit).once
expect(third_page_cursor).to receive(:limit).once
subject.yield_documents { |doc|; }
end
it 'yields each document of the collection remapping ids correctly scrolling through pages' do
expected_ids = all_data.map { |d| d['_id'] }.to_a
yielded_documents = []
subject.yield_documents { |doc| yielded_documents << doc }
expect(yielded_documents.size).to eq(all_data.size)
expected_ids.each do |id|
expect(yielded_documents).to include(a_hash_including('id' => id))
end
end
context 'when a single document causes an error' do
let(:tolerable_error) { 'mock serialization failure' }
before(:each) do
allow(subject).to receive(:serialize).and_call_original
allow(subject).to receive(:serialize).with(doc2).and_raise(tolerable_error)
end
it 'does not crash' do
expect(Utility::Logger).to receive(:warn).with(include(tolerable_error))
expect { subject.yield_documents { |_| }.to_a }.to_not raise_error
end
end
context 'an overall limit is set' do
let(:options) {
{
:limit => 1
}
}
let(:advanced_snippet) {
{
:find => {
:options => options
},
}
}
it 'fetches 1 document overall' do
yielded_documents = []
subject.yield_documents { |doc| yielded_documents << doc }
expect(yielded_documents.size).to eq(1)
end
end
context 'skip is set for filtering' do
let(:options) {
{
:skip => 1
}
}
let(:advanced_snippet) {
{
:find => {
:options => options
},
}
}
before(:each) do
allow(mongo_collection_cursor).to receive(:skip).with(1).and_return(mongo_collection_cursor)
# skip first element
allow(mongo_collection_cursor).to receive(:limit).and_return(first_page_data[1..2])
# start at page_size + 1 element (skipped element at the beginning)
allow(mongo_collection_cursor).to receive(:skip).with(page_size + 1).and_return(second_page_cursor)
allow(second_page_cursor).to receive(:limit).and_return(second_page_data)
# start at (page_size * 2) + 1 element (skipped element at the beginning)
allow(mongo_collection_cursor).to receive(:skip).with((page_size * 2) + 1).and_return(third_page_cursor)
allow(third_page_cursor).to receive(:limit).and_return(third_page_data)
end
it 'skips the first element, yielding 3 overall' do
yielded_documents = []
subject.yield_documents { |doc| yielded_documents << doc }
expect(yielded_documents.size).to eq(3)
end
end
end
context 'when field of type BSON::ObjectId is met' do
let(:id) { '63238d68dc461bfe327e9634' }
let(:actual_collection_data) do
[
{ '_id' => BSON::ObjectId.from_string(id) }
]
end
it 'serializes the field correctly' do
# only 1 record is there, so meh no need to do it outside of yield_documents
subject.yield_documents do |doc|
expect(doc['id']).to eq(id)
end
end
end
context 'when field of type BSON::Decimal128 is met' do
let(:price) { '12.00' }
let(:actual_collection_data) do
[
{ '_id' => 1, 'price' => BSON::Decimal128.from_string(price) }
]
end
it 'serializes the field correctly' do
# only 1 record is there, so meh no need to do it outside of yield_documents
subject.yield_documents do |doc|
expect(doc['price']).to eq(BigDecimal(price))
end
end
end
context 'when array of strings is met' do
let(:array_of_strings) { ['1', '1b', '17', '222'] }
let(:actual_collection_data) do
[
{ '_id' => 1, 'rooms' => array_of_strings }
]
end
it 'serializes the field correctly' do
# only 1 record is there, so meh no need to do it outside of yield_documents
subject.yield_documents do |doc|
expect(doc['rooms']).to eq(array_of_strings)
end
end
end
context 'when field that needs special serialization is nested in hash' do
let(:price) { '12.00' }
let(:actual_collection_data) do
[
{ '_id' => 1, 'room' => { 'price' => BSON::Decimal128.from_string(price) } }
]
end
it 'serializes the field correctly' do
# only 1 record is there, so meh no need to do it outside of yield_documents
subject.yield_documents do |doc|
expect(doc['room']['price']).to eq(BigDecimal(price))
end
end
end
context 'when field that needs special serialization is nested in array' do
let(:price) { '12.00' }
let(:actual_collection_data) do
[
{ '_id' => 1, 'rooms' => [{ 'price' => BSON::Decimal128.from_string(price) }] }
]
end
it 'serializes the field correctly' do
# only 1 record is there, so meh no need to do it outside of yield_documents
subject.yield_documents do |doc|
expect(doc['rooms'][0]['price']).to eq(BigDecimal(price))
end
end
end
end
context 'find field exists (with filter and options) in an active advanced filtering config' do
it 'calls the mongo client\'s find method with filter and options' do
expect(actual_collection).to receive(:find).with(filter, options)
subject.yield_documents
end
end
context 'rules and advanced filtering config are empty' do
let(:rules) { [] }
let(:advanced_snippet) { {} }
it 'calls find without arguments' do
expect(actual_collection).to receive(:find).with(no_args)
subject.yield_documents
end
end
context 'both rules and advanced filtering are present' do
it 'does not apply the simple rule' do
expect(actual_collection).to receive(:find).with(filter, options)
subject.yield_documents
end
end
context 'find field exists (with filter and empty options) in an active advanced filtering config' do
let(:options) {
{}
}
it 'calls the mongo client find method with filter and empty options' do
expect(actual_collection).to receive(:find).with(filter, {})
subject.yield_documents
end
end
context 'find field exists (with filter and nil options) in an active advanced filtering config' do
let(:options) {
nil
}
it 'calls the mongo client find method with filter and nil options without breaking' do
expect(actual_collection).to receive(:find).with(filter, {})
subject.yield_documents
end
end
context 'find field exists (with empty filter and existing options) in an active advanced filtering config' do
let(:filter) {
{}
}
it 'calls the mongo client fiend method with empty filter and existing options' do
expect(actual_collection).to receive(:find).with({}, options)
subject.yield_documents
end
end
context 'find field exists (with nil filter and existing options) in an active advanced filtering config' do
let(:filter) {
nil
}
it 'calls the mongo client find method with nil and existing options' do
# nil is also the default value in the mongodb client, so it's ok to pass it
expect(actual_collection).to receive(:find).with(nil, options)
subject.yield_documents
end
end
context 'find field exists with empty filter and empty options' do
let(:find) {
{
:filter => {},
:options => {}
}
}
it 'logs a warning' do
expect(Utility::Logger).to receive(:warn).with('\'Find\' was specified with an empty filter and empty options.')
subject.yield_documents
end
end
context 'when aggregate is present' do
shared_examples_for 'calls aggregate' do |expected_pipeline, expected_options|
it '' do
expect(actual_collection).to receive(:aggregate).with(expected_pipeline, expected_options)
expect(mongo_collection_cursor).to receive(:each)
# aggregate does not expose this functionality directly (only through pipeline stages)
expect(mongo_collection_cursor).to_not receive(:skip)
expect(mongo_collection_cursor).to_not receive(:limit)
subject.yield_documents
end
end
# necessary to reuse it as parameter for it_behaves_like and let()
pipeline = [{ '$skip' => 5 }]
options = { 'batch_size' => 10 }
let(:pipeline) { pipeline }
let(:options) { options }
let(:advanced_snippet) {
{
:aggregate => aggregate
}
}
it_behaves_like 'calls aggregate', pipeline, options
context 'when options are not present' do
context 'when options are empty' do
let(:options) { {} }
it_behaves_like 'calls aggregate', pipeline, {}
end
context 'when options are nil' do
let(:options) { nil }
it_behaves_like 'calls aggregate', pipeline, {}
end
end
context 'when pipeline is not present' do
context 'when pipeline is empty' do
let(:pipeline) { [] }
it_behaves_like 'calls aggregate', [], options
end
context 'when pipeline is nil' do
let(:pipeline) { nil }
# still calls aggregate with an empty pipeline. Nil pipeline would lead to the aggregation to crash
it_behaves_like 'calls aggregate', [], options
end
end
context 'when pipeline and options are nil' do
let(:pipeline) { nil }
let(:options) { nil }
it_behaves_like 'calls aggregate', [], {}
it 'logs a warning' do
expect(Utility::Logger).to receive(:warn).with('\'Aggregate\' was specified with an empty pipeline and empty options.')
expect(mongo_collection_cursor).to receive(:each)
subject.yield_documents
end
end
end
end
end