t/Client_7_0_Async/33_bulk_errors.t (153 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
use Test::More;
use Test::Deep;
use Test::Exception;
use AE;
use strict;
use warnings;
use lib 't/lib';
use Log::Any::Adapter;
$ENV{ES_VERSION} = '7_0';
my $es = do "es_async.pl" or die( $@ || $! );
my $TRUE = $es->transport->serializer->decode('{"true":true}')->{true};
my $cv = AE::cv;
wait_for( $es->indices->delete( index => '_all' ) );
my @Std = (
{ id => 1, source => { count => 1 } },
{ id => 1, source => { count => 'foo' } },
{ id => 1, source => {} },
);
my ( $b, $error, $success_count, $error_count, $custom_count, $conflict_count );
## Default error handling
$b = bulk( { index => 'test'}, @Std );
test_flush( "Default", 0, 1, 0, 0 );
## Custom error handling
$b = bulk(
{ index => 'test',
on_error => sub { $custom_count++ }
},
@Std
);
test_flush( "Custom error", 0, 0, 1, 0 );
# Conflict errors
$b = bulk(
{ index => 'test',
on_conflict => sub { $conflict_count++ }
},
@Std
);
test_flush( "Conflict error", 0, 1, 0, 0 );
# Both error handling
$b = bulk(
{ index => 'test',
on_conflict => sub { $conflict_count++ },
on_error => sub { $custom_count++ }
},
@Std
);
test_flush( "Conflict and custom", 0, 0, 1, 0 );
# Conflict disable error
$b = bulk(
{ index => 'test',
on_conflict => sub { $conflict_count++ },
on_error => undef
},
@Std
);
test_flush( "Conflict, error undef", 0, 0, 0, 0 );
# Disable both
$b = bulk(
{ index => 'test',
on_conflict => undef,
on_error => undef
},
@Std
);
test_flush( "Both undef", 0, 0, 0, 0 );
# Success
$b = bulk(
{ index => 'test',
on_success => sub { $success_count++ },
},
@Std
);
test_flush( "Success", 2, 1, 0, 0 );
# cbs have correct params
$b = bulk(
{ index => 'test',
on_success => test_params(
'on_success',
{ _index => 'test',
_type => '_doc',
_id => 1,
_version => 1,
created => $TRUE,
_shards => { successful => 1, total => 2, failed => 0 },
_primary_term => 1,
_seq_no => 0,
},
0
),
on_error => test_params(
'on_error',
{ _index => 'test',
_type => '_doc',
_id => 1,
error => any(
re('MapperParsingException'),
superhashof( { type => 'mapper_parsing_exception' } )
),
status => 400,
},
1
),
on_conflict => test_params(
'on_conflict',
{ _index => 'test',
_type => '_doc',
_id => 1,
error => any(
re('version conflict'),
superhashof(
{ type => 'version_conflict_engine_exception' }
)
),
status => 409,
},
2, 1
),
},
@Std
);
wait_for( $b->flush );
done_testing;
wait_for( $es->indices->delete( index => '_all' ) );
#===================================
sub bulk {
#===================================
my ( $params, @docs ) = @_;
my $b = $es->bulk_helper(
on_fatal => sub { $error = shift(); $error_count++ },
%$params,
);
$error = '';
wait_for(
$es->indices->delete( index => 'test', ignore => 404 ) #
->then( sub { $es->indices->create( index => 'test' ) } ) #
->then(
sub { $es->cluster->health( wait_for_status => 'yellow' ) }
) #
->then( sub { $b->index(@docs) } )
);
return $b;
}
#===================================
sub test_flush {
#===================================
my ( $title, $success, $default, $custom, $conflict ) = @_;
$success_count = $custom_count = $error_count = $conflict_count = 0;
{
local $SIG{__WARN__} = sub { $error_count++ };
wait_for( $b->flush );
}
is $success_count, $success, "$title - success";
is $error_count, $default, "$title - default";
is $custom_count, $custom, "$title - custom";
is $conflict_count, $conflict, "$title - conflict";
}
#===================================
sub test_params {
#===================================
my ( $type, $result, $j, $version ) = @_;
return sub {
is $_[0], 'index', "$type - action";
cmp_deeply subhashof($result), $_[1], "$type - result";
is $_[2], $j, "$type - array index";
is $_[3], $version, "$type - version";
};
}