t/Client_8_0_Async/30_bulk_add_action.t (177 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. use Test::More; use Test::Deep; use Test::Exception; use strict; use warnings; use lib 't/lib'; my ( $error, $name ); $ENV{ES_VERSION} = '8_0'; my $es = do "es_async.pl" or die( $@ || $! ); my $b = $es->bulk_helper( on_fatal => sub { like shift(), qr/$error/, $name; } ); $b->_serializer->_set_canonical; ## EMPTY ok $b->add_action(), 'Empty add action'; ## INDEX ACTIONS ## ok $b->add_action( index => { index => 'foo', id => 1, pipeline => 'foo', routing => 1, parent => 1, timestamp => 1380019061000, ttl => '10m', version => 1, version_type => 'external', source => { foo => 'bar' }, }, index => { _index => 'foo', _id => 2, _routing => 2, _parent => 2, _timestamp => 1380019061000, _ttl => '10m', _version => 1, _version_type => 'external', source => { foo => 'bar' }, } ), 'Add index actions'; cmp_deeply $b->_buffer, [ q({"index":{"_id":1,"_index":"foo","parent":1,"pipeline":"foo","routing":1,"timestamp":1380019061000,"ttl":"10m","version":1,"version_type":"external"}}), q({"foo":"bar"}), q({"index":{"_id":2,"_index":"foo","parent":2,"routing":2,"timestamp":1380019061000,"ttl":"10m","version":1,"version_type":"external"}}), q({"foo":"bar"}) ], "Index actions in buffer"; is $b->_buffer_size, 313, "Index actions buffer size"; is $b->_buffer_count, 2, "Index actions buffer count"; $b->clear_buffer; ## CREATE ACTIONS ## ok $b->add_action( create => { index => 'foo', id => 1, routing => 1, parent => 1, pipeline => 'foo', timestamp => 1380019061000, ttl => '10m', version => 1, version_type => 'external', source => { foo => 'bar' }, }, create => { _index => 'foo', _id => 2, _routing => 2, _parent => 2, _timestamp => 1380019061000, _ttl => '10m', _version => 1, _version_type => 'external', source => { foo => 'bar' }, } ), 'Add create actions'; cmp_deeply $b->_buffer, [ q({"create":{"_id":1,"_index":"foo","parent":1,"pipeline":"foo","routing":1,"timestamp":1380019061000,"ttl":"10m","version":1,"version_type":"external"}}), q({"foo":"bar"}), q({"create":{"_id":2,"_index":"foo","parent":2,"routing":2,"timestamp":1380019061000,"ttl":"10m","version":1,"version_type":"external"}}), q({"foo":"bar"}) ], "Create actions in buffer"; is $b->_buffer_size, 315, "Create actions buffer size"; is $b->_buffer_count, 2, "Create actions buffer count"; $b->clear_buffer; ## DELETE ACTIONS ## ok $b->add_action( delete => { index => 'foo', id => 1, routing => 1, parent => 1, version => 1, version_type => 'external', }, delete => { _index => 'foo', _id => 2, _routing => 2, _parent => 2, _version => 1, version_type => 'external', } ), 'Add delete actions'; cmp_deeply $b->_buffer, [ q({"delete":{"_id":1,"_index":"foo","parent":1,"routing":1,"version":1,"version_type":"external"}}), q({"delete":{"_id":2,"_index":"foo","parent":2,"routing":2,"version":1,"version_type":"external"}}), ], "Delete actions in buffer"; is $b->_buffer_size, 194, "Delete actions buffer size"; is $b->_buffer_count, 2, "Delete actions buffer count"; $b->clear_buffer; ## UPDATE ACTIONS ## ok $b->add_action( update => { index => 'foo', id => 1, routing => 1, _source => 'true', _source_includes => 'foo', _source_excludes => 'bar', doc => { foo => 'bar' }, doc_as_upsert => 1, fields => ["*"], script => 'ctx._source+=1', scripted_upsert => 'true', retry_on_conflict => 3, }, update => { _index => 'foo', _id => 1, _routing => 1, _source => 'true', _source_includes => 'foo', _source_excludes => 'bar', doc => { foo => 'bar' }, doc_as_upsert => 1, fields => ["*"], script => 'ctx._source+=1', scripted_upsert => 'true', retry_on_conflict => 3, } ), 'Add update actions'; cmp_deeply $b->_buffer, [ q({"update":{"_id":1,"_index":"foo","routing":1}}), q({"_source":"true","_source_excludes":"bar","_source_includes":"foo","doc":{"foo":"bar"},"doc_as_upsert":1,"fields":["*"],"retry_on_conflict":3,"script":"ctx._source+=1","scripted_upsert":"true"}), q({"update":{"_id":1,"_index":"foo","routing":1}}), q({"_source":"true","_source_excludes":"bar","_source_includes":"foo","doc":{"foo":"bar"},"doc_as_upsert":1,"fields":["*"],"retry_on_conflict":3,"script":"ctx._source+=1","scripted_upsert":"true"}) ], "Update actions in buffer"; is $b->_buffer_size, 486, "Update actions buffer size"; is $b->_buffer_count, 2, "Update actions buffer count"; $b->clear_buffer; ## ERRORS ## $error = "Unrecognised action"; $name = 'Bad action'; $b->add_action( 'foo' => {} ); $error = "Missing <params>"; $name = 'Missing params'; $b->add_action( 'index', 'bar' ); $error = "Missing .*<index>"; $name = 'Missing index'; $b->add_action( index => { type => 't' } ); $error = "Missing .*<source>"; $name = 'Missing source'; $b->add_action( index => { index => 'i', type => 't' } ); $error = "Unknown params"; $name = 'Unknown params'; $b->add_action( index => { index => 'i', type => 't', source => {}, foo => 1 } ); done_testing;