t/Client_7_0_Async/32_bulk_flush.t (105 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. use Test::More; use Test::Deep; use strict; use warnings; use lib 't/lib'; use AE; use Promises qw(deferred); $ENV{ES_VERSION} = '7_0'; my $es = do "es_async.pl" or die( $@ || $! ); wait_for( $es->indices->delete( index => '_all' ) ); test_flush( "max count", # { max_count => 3 }, # 1, 2, 0, 1, 2, 0, 1, 2, 0, 1 ); test_flush( "max size", # { max_size => 95 }, # 1, 2, 3, 0, 1, 2, 3, 0, 1, 2 ); test_flush( "max size > max_count", { max_size => 95, max_count => 3 }, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1 ); test_flush( "max size < max_count", { max_size => 95, max_count => 5 }, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2 ); test_flush( "max size = 0, max_count", { max_size => 0, max_count => 5 }, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0 ); test_flush( "max count = 0, max_size", { max_size => 95, max_count => 0 }, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2 ); test_flush( "max count = 0, max_size = 0", { max_size => 0, max_count => 0 }, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ); test_flush( "max_count = 5, max_time = 5", { max_count => 5, max_time => 5 }, 1, 2, 0, 1, 2, 3, 4, 0, 0, 1 ); done_testing; wait_for( $es->indices->delete( index => 'test' ) ); #=================================== sub test_flush { #=================================== my $title = shift; my $params = shift; my $b = $es->bulk_helper( %$params, index => 'test' ); my @seq = @_; my $cv = AE::cv; my $i = 10; my $loop; my $index_doc = sub { $b->index( { id => $i, source => {} } ); }; my $check_buffer = sub { is $b->_buffer_count, shift @seq, "$title - " . ( $i - 9 ); $i++; }; my $d = deferred; my $w; $loop = sub { if ( $i == 20 ) { return $b->flush->then( sub { $d->resolve } ); } # sleep on 12 or 18 if max_time specified if ( $params->{max_time} && ( $i == 12 || $i == 18 ) ) { $b->_last_flush( time - $params->{max_time} - 1 ); } $index_doc->()->then($check_buffer)->then($loop); }; $es->indices->delete( index => 'test', ignore => 404 ) ->then( sub { $es->indices->create( index => 'test' ) } ) ->then( sub { $es->cluster->health( wait_for_status => 'yellow' ) } ) ->then($loop); $d->promise->then( sub { is $b->_buffer_count, 0, "$title - final flush"; $es->indices->refresh; } )->then( sub { $es->count; } )->then( sub { is shift()->{count}, 10, "$title - all docs indexed"; } )->then( sub { $cv->send; } )->catch( sub { $cv->croak(@_) } ); $cv->recv; }