t/Client_7_0/30_bulk_add_action.t (181 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
use Test::More;
use Test::Deep;
use Test::Exception;
use strict;
use warnings;
use lib 't/lib';
$ENV{ES_VERSION} = '7_0';
my $es = do "es_sync.pl" or die( $@ || $! );
my $b = $es->bulk_helper;
$b->_serializer->_set_canonical;
## EMPTY
ok $b->add_action(), 'Empty add action';
## INDEX ACTIONS ##
ok $b->add_action(
index => {
index => 'foo',
id => 1,
pipeline => 'foo',
routing => 1,
parent => 1,
timestamp => 1380019061000,
ttl => '10m',
version => 1,
version_type => 'external',
source => { foo => 'bar' },
},
index => {
_index => 'foo',
_id => 2,
_routing => 2,
_parent => 2,
_timestamp => 1380019061000,
_ttl => '10m',
_version => 1,
_version_type => 'external',
source => { foo => 'bar' },
}
),
'Add index actions';
cmp_deeply $b->_buffer,
[
q({"index":{"_id":1,"_index":"foo","parent":1,"pipeline":"foo","routing":1,"timestamp":1380019061000,"ttl":"10m","version":1,"version_type":"external"}}),
q({"foo":"bar"}),
q({"index":{"_id":2,"_index":"foo","parent":2,"routing":2,"timestamp":1380019061000,"ttl":"10m","version":1,"version_type":"external"}}),
q({"foo":"bar"})
],
"Index actions in buffer";
is $b->_buffer_size, 313, "Index actions buffer size";
is $b->_buffer_count, 2, "Index actions buffer count";
$b->clear_buffer;
## CREATE ACTIONS ##
ok $b->add_action(
create => {
index => 'foo',
id => 1,
routing => 1,
parent => 1,
pipeline => 'foo',
timestamp => 1380019061000,
ttl => '10m',
version => 1,
version_type => 'external',
source => { foo => 'bar' },
},
create => {
_index => 'foo',
_id => 2,
_routing => 2,
_parent => 2,
_timestamp => 1380019061000,
_ttl => '10m',
_version => 1,
_version_type => 'external',
source => { foo => 'bar' },
}
),
'Add create actions';
cmp_deeply $b->_buffer,
[
q({"create":{"_id":1,"_index":"foo","parent":1,"pipeline":"foo","routing":1,"timestamp":1380019061000,"ttl":"10m","version":1,"version_type":"external"}}),
q({"foo":"bar"}),
q({"create":{"_id":2,"_index":"foo","parent":2,"routing":2,"timestamp":1380019061000,"ttl":"10m","version":1,"version_type":"external"}}),
q({"foo":"bar"})
],
"Create actions in buffer";
is $b->_buffer_size, 315, "Create actions buffer size";
is $b->_buffer_count, 2, "Create actions buffer count";
$b->clear_buffer;
## DELETE ACTIONS ##
ok $b->add_action(
delete => {
index => 'foo',
id => 1,
routing => 1,
parent => 1,
version => 1,
version_type => 'external',
},
delete => {
_index => 'foo',
_id => 2,
_routing => 2,
_parent => 2,
_version => 1,
version_type => 'external',
}
),
'Add delete actions';
cmp_deeply $b->_buffer,
[
q({"delete":{"_id":1,"_index":"foo","parent":1,"routing":1,"version":1,"version_type":"external"}}),
q({"delete":{"_id":2,"_index":"foo","parent":2,"routing":2,"version":1,"version_type":"external"}}),
],
"Delete actions in buffer";
is $b->_buffer_size, 194, "Delete actions buffer size";
is $b->_buffer_count, 2, "Delete actions buffer count";
$b->clear_buffer;
## UPDATE ACTIONS ##
ok $b->add_action(
update => {
index => 'foo',
id => 1,
routing => 1,
parent => 1,
timestamp => 1380019061000,
ttl => '10m',
version => 1,
version_type => 'external',
detect_noop => 'true',
_source => 'true',
_source_includes => ['foo'],
_source_excludes => ['bar'],
doc => { foo => 'bar' },
doc_as_upsert => 1,
fields => ["*"],
script => 'ctx._source+=1',
scripted_upsert => 'true',
retry_on_conflict => 3,
},
update => {
_index => 'foo',
_id => 1,
_routing => 1,
_parent => 1,
_timestamp => 1380019061000,
_ttl => '10m',
_version => 1,
_version_type => 'external',
detect_noop => 'true',
_source => 'true',
_source_includes => ['foo'],
_source_excludes => ['bar'],
doc => { foo => 'bar' },
doc_as_upsert => 1,
fields => ["*"],
script => 'ctx._source+=1',
scripted_upsert => 'true',
retry_on_conflict => 3,
}
),
'Add update actions';
cmp_deeply $b->_buffer,
[
q({"update":{"_id":1,"_index":"foo","parent":1,"routing":1,"timestamp":1380019061000,"ttl":"10m","version":1,"version_type":"external"}}),
q({"_source":"true","_source_excludes":["bar"],"_source_includes":["foo"],"detect_noop":"true","doc":{"foo":"bar"},"doc_as_upsert":1,"fields":["*"],"retry_on_conflict":3,"script":"ctx._source+=1","scripted_upsert":"true"}),
q({"update":{"_id":1,"_index":"foo","parent":1,"routing":1,"timestamp":1380019061000,"ttl":"10m","version":1,"version_type":"external"}}),
q({"_source":"true","_source_excludes":["bar"],"_source_includes":["foo"],"detect_noop":"true","doc":{"foo":"bar"},"doc_as_upsert":1,"fields":["*"],"retry_on_conflict":3,"script":"ctx._source+=1","scripted_upsert":"true"})
],
"Update actions in buffer";
is $b->_buffer_size, 710, "Update actions buffer size";
is $b->_buffer_count, 2, "Update actions buffer count";
$b->clear_buffer;
## ERRORS ##
throws_ok { $b->add_action( 'foo' => {} ) } qr/Unrecognised action/,
'Bad action';
throws_ok { $b->add_action( 'index', 'bar' ) } qr/Missing <params>/,
'Missing params';
throws_ok { $b->add_action( index => { } ) }
qr/Missing .*<index>/, 'Missing index';
throws_ok { $b->add_action( index => { index => 'i' } ) }
qr/Missing <source>/, 'Missing source';
throws_ok {
$b->add_action(
index => { index => 'i', source => {}, foo => 1 } );
}
qr/Unknown params/, 'Unknown params';
done_testing;