sub test_flush()

in t/Client_7_0_Async/32_bulk_flush.t [84:147]


sub test_flush {
#===================================
    my $title  = shift;
    my $params = shift;
    my $b      = $es->bulk_helper(
        %$params,
        index => 'test'
    );

    my @seq = @_;
    my $cv  = AE::cv;

    my $i = 10;
    my $loop;

    my $index_doc = sub {
        $b->index( { id => $i, source => {} } );
    };

    my $check_buffer = sub {
        is $b->_buffer_count, shift @seq, "$title - " . ( $i - 9 );
        $i++;
    };

    my $d = deferred;
    my $w;

    $loop = sub {
        if ( $i == 20 ) {
            return $b->flush->then( sub { $d->resolve } );
        }

        # sleep on 12 or 18 if max_time specified
        if ( $params->{max_time} && ( $i == 12 || $i == 18 ) ) {
            $b->_last_flush( time - $params->{max_time} - 1 );
        }
        $index_doc->()->then($check_buffer)->then($loop);
    };

    $es->indices->delete( index => 'test', ignore => 404 )
        ->then( sub { $es->indices->create( index => 'test' ) } )
        ->then( sub { $es->cluster->health( wait_for_status => 'yellow' ) } )
        ->then($loop);

    $d->promise->then(
        sub {
            is $b->_buffer_count, 0, "$title - final flush";
            $es->indices->refresh;
        }
        )->then(
        sub {
            $es->count;
        }
        )->then(
        sub {
            is shift()->{count}, 10, "$title - all docs indexed";
        }
        )->then(
        sub {
            $cv->send;
        }
        )->catch( sub { $cv->croak(@_) } );
    $cv->recv;
}