t/Client_8_0/33_bulk_errors.t (118 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
use Test::More;
use Test::Deep;
use Test::Exception;
use strict;
use warnings;
use lib 't/lib';
use Log::Any::Adapter;
$ENV{ES_VERSION} = '8_0';
my $es = do "es_sync.pl" or die( $@ || $! );
my $TRUE = $es->transport->serializer->decode('{"true":true}')->{true};
$es->indices->delete( index => '_all' );
my @Std = (
{ id => 1, source => { count => 1 } },
{ id => 1, source => { count => 'foo' } },
);
my ( $b, $success_count, $error_count, $custom_count, $conflict_count );
## Default error handling
$b = bulk( { index => 'test' }, @Std );
test_flush( "Default", 0, 1, 0, 0 );
## Custom error handling
$b = bulk(
{ index => 'test',
on_error => sub { $custom_count++ }
},
@Std
);
test_flush( "Custom error", 0, 0, 1, 0 );
# Disable both
$b = bulk(
{ index => 'test',
on_conflict => undef,
on_error => undef
},
@Std
);
test_flush( "Both undef", 0, 0, 0, 0 );
# Success
$b = bulk(
{ index => 'test',
on_success => sub { $success_count++ },
},
@Std
);
test_flush( "Success", 1, 1, 0, 0 );
# cbs have correct params
$b = bulk(
{ index => 'test',
on_success => test_params(
'on_success',
{ _index => 'test',
_id => 1,
_version => 1,
status => 201,
created => $TRUE,
result => 'created',
_shards => { successful => 1, total => 2, failed => 0 },
_primary_term => 1,
_seq_no => 0
},
0
),
on_error => test_params(
'on_error',
{ _index => 'test',
_id => 1,
error => any(
re('MapperParsingException'),
superhashof( { type => 'mapper_parsing_exception' } )
),
status => 400,
},
1
),
on_conflict => test_params(
'on_conflict',
{ _index => 'test',
_id => 1,
error => any(
re('version conflict'),
superhashof(
{ type => 'version_conflict_engine_exception' }
)
),
status => 409,
},
2,
1
),
},
@Std
);
$b->flush;
done_testing;
$es->indices->delete( index => 'test' );
#===================================
sub bulk {
#===================================
my $params = shift;
my $b = $es->bulk_helper($params);
$es->indices->delete( index => 'test', ignore => 404 );
$es->indices->create( index => 'test' );
$es->cluster->health( wait_for_status => 'yellow' );
$b->index(@_);
return $b;
}
#===================================
sub test_flush {
#===================================
my ( $title, $success, $default, $custom, $conflict ) = @_;
$success_count = $custom_count = $error_count = $conflict_count = 0;
{
local $SIG{__WARN__} = sub { $error_count++ };
$b->flush;
}
is $success_count, $success, "$title - success";
is $error_count, $default, "$title - default";
is $custom_count, $custom, "$title - custom";
is $conflict_count, $conflict, "$title - conflict";
}
#===================================
sub test_params {
#===================================
my ( $type, $result, $j, $version ) = @_;
return sub {
is $_[0], 'index', "$type - action";
is $_[2], $j, "$type - array index";
is $_[3], $version, "$type - version";
};
}