t/Client_7_0_Async/30_bulk_add_action.t (177 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
use Test::More;
use Test::Deep;
use Test::Exception;
use strict;
use warnings;
use lib 't/lib';
my ( $error, $name );
$ENV{ES_VERSION} = '7_0';
my $es = do "es_async.pl" or die( $@ || $! );
my $b = $es->bulk_helper(
on_fatal => sub {
like shift(), qr/$error/, $name;
}
);
$b->_serializer->_set_canonical;
## EMPTY
ok $b->add_action(), 'Empty add action';
## INDEX ACTIONS ##
ok $b->add_action(
index => {
index => 'foo',
id => 1,
pipeline => 'foo',
routing => 1,
parent => 1,
timestamp => 1380019061000,
ttl => '10m',
version => 1,
version_type => 'external',
source => { foo => 'bar' },
},
index => {
_index => 'foo',
_id => 2,
_routing => 2,
_parent => 2,
_timestamp => 1380019061000,
_ttl => '10m',
_version => 1,
_version_type => 'external',
source => { foo => 'bar' },
}
),
'Add index actions';
cmp_deeply $b->_buffer,
[
q({"index":{"_id":1,"_index":"foo","parent":1,"pipeline":"foo","routing":1,"timestamp":1380019061000,"ttl":"10m","version":1,"version_type":"external"}}),
q({"foo":"bar"}),
q({"index":{"_id":2,"_index":"foo","parent":2,"routing":2,"timestamp":1380019061000,"ttl":"10m","version":1,"version_type":"external"}}),
q({"foo":"bar"})
],
"Index actions in buffer";
is $b->_buffer_size, 313, "Index actions buffer size";
is $b->_buffer_count, 2, "Index actions buffer count";
$b->clear_buffer;
## CREATE ACTIONS ##
ok $b->add_action(
create => {
index => 'foo',
id => 1,
routing => 1,
parent => 1,
pipeline => 'foo',
timestamp => 1380019061000,
ttl => '10m',
version => 1,
version_type => 'external',
source => { foo => 'bar' },
},
create => {
_index => 'foo',
_id => 2,
_routing => 2,
_parent => 2,
_timestamp => 1380019061000,
_ttl => '10m',
_version => 1,
_version_type => 'external',
source => { foo => 'bar' },
}
),
'Add create actions';
cmp_deeply $b->_buffer,
[
q({"create":{"_id":1,"_index":"foo","parent":1,"pipeline":"foo","routing":1,"timestamp":1380019061000,"ttl":"10m","version":1,"version_type":"external"}}),
q({"foo":"bar"}),
q({"create":{"_id":2,"_index":"foo","parent":2,"routing":2,"timestamp":1380019061000,"ttl":"10m","version":1,"version_type":"external"}}),
q({"foo":"bar"})
],
"Create actions in buffer";
is $b->_buffer_size, 315, "Create actions buffer size";
is $b->_buffer_count, 2, "Create actions buffer count";
$b->clear_buffer;
## DELETE ACTIONS ##
ok $b->add_action(
delete => {
index => 'foo',
id => 1,
routing => 1,
parent => 1,
version => 1,
version_type => 'external',
},
delete => {
_index => 'foo',
_id => 2,
_routing => 2,
_parent => 2,
_version => 1,
version_type => 'external',
}
),
'Add delete actions';
cmp_deeply $b->_buffer,
[
q({"delete":{"_id":1,"_index":"foo","parent":1,"routing":1,"version":1,"version_type":"external"}}),
q({"delete":{"_id":2,"_index":"foo","parent":2,"routing":2,"version":1,"version_type":"external"}}),
],
"Delete actions in buffer";
is $b->_buffer_size, 194, "Delete actions buffer size";
is $b->_buffer_count, 2, "Delete actions buffer count";
$b->clear_buffer;
## UPDATE ACTIONS ##
ok $b->add_action(
update => {
index => 'foo',
id => 1,
routing => 1,
_source => 'true',
_source_includes => 'foo',
_source_excludes => 'bar',
doc => { foo => 'bar' },
doc_as_upsert => 1,
fields => ["*"],
script => 'ctx._source+=1',
scripted_upsert => 'true',
retry_on_conflict => 3,
},
update => {
_index => 'foo',
_id => 1,
_routing => 1,
_source => 'true',
_source_includes => 'foo',
_source_excludes => 'bar',
doc => { foo => 'bar' },
doc_as_upsert => 1,
fields => ["*"],
script => 'ctx._source+=1',
scripted_upsert => 'true',
retry_on_conflict => 3,
}
),
'Add update actions';
cmp_deeply $b->_buffer,
[
q({"update":{"_id":1,"_index":"foo","routing":1}}),
q({"_source":"true","_source_excludes":"bar","_source_includes":"foo","doc":{"foo":"bar"},"doc_as_upsert":1,"fields":["*"],"retry_on_conflict":3,"script":"ctx._source+=1","scripted_upsert":"true"}),
q({"update":{"_id":1,"_index":"foo","routing":1}}),
q({"_source":"true","_source_excludes":"bar","_source_includes":"foo","doc":{"foo":"bar"},"doc_as_upsert":1,"fields":["*"],"retry_on_conflict":3,"script":"ctx._source+=1","scripted_upsert":"true"})
],
"Update actions in buffer";
is $b->_buffer_size, 486, "Update actions buffer size";
is $b->_buffer_count, 2, "Update actions buffer count";
$b->clear_buffer;
## ERRORS ##
$error = "Unrecognised action";
$name = 'Bad action';
$b->add_action( 'foo' => {} );
$error = "Missing <params>";
$name = 'Missing params';
$b->add_action( 'index', 'bar' );
$error = "Missing .*<index>";
$name = 'Missing index';
$b->add_action( index => { type => 't' } );
$error = "Missing .*<source>";
$name = 'Missing source';
$b->add_action( index => { index => 'i', type => 't' } );
$error = "Unknown params";
$name = 'Unknown params';
$b->add_action(
index => { index => 'i', type => 't', source => {}, foo => 1 } );
done_testing;