in t/Client_8_0_Async/32_bulk_flush.t [84:147]
sub test_flush {
#===================================
my $title = shift;
my $params = shift;
my $b = $es->bulk_helper(
%$params,
index => 'test'
);
my @seq = @_;
my $cv = AE::cv;
my $i = 10;
my $loop;
my $index_doc = sub {
$b->index( { id => $i, source => {} } );
};
my $check_buffer = sub {
is $b->_buffer_count, shift @seq, "$title - " . ( $i - 9 );
$i++;
};
my $d = deferred;
my $w;
$loop = sub {
if ( $i == 20 ) {
return $b->flush->then( sub { $d->resolve } );
}
# sleep on 12 or 18 if max_time specified
if ( $params->{max_time} && ( $i == 12 || $i == 18 ) ) {
$b->_last_flush( time - $params->{max_time} - 1 );
}
$index_doc->()->then($check_buffer)->then($loop);
};
$es->indices->delete( index => 'test', ignore => 404 )
->then( sub { $es->indices->create( index => 'test' ) } )
->then( sub { $es->cluster->health( wait_for_status => 'yellow' ) } )
->then($loop);
$d->promise->then(
sub {
is $b->_buffer_count, 0, "$title - final flush";
$es->indices->refresh;
}
)->then(
sub {
$es->count;
}
)->then(
sub {
is shift()->{count}, 10, "$title - all docs indexed";
}
)->then(
sub {
$cv->send;
}
)->catch( sub { $cv->croak(@_) } );
$cv->recv;
}