sub flush()

in lib/Search/Elasticsearch/Client/8_0/Async/Bulk.pm [93:129]


sub flush {
#===================================
    my $self = shift;

    my $size  = $self->_buffer_size;
    my $count = $self->_buffer_count;

    $self->_last_flush(time);

    unless ($size) {
        return deferred->resolve( { items => [] } )->promise;
    }

    my @items = ( @{ $self->_buffer } );
    $self->clear_buffer;

    if ( $self->verbose ) {
        local $| = 1;
        print ".";
    }

    my $promise
        = $self->es->bulk( %{ $self->_bulk_args }, body => \@items )->catch(
        sub {
            my $error = shift;
            if ( $error->is( 'Cxn', 'NoNodes' ) ) {
                push @{ $self->_buffer }, @items;
                $self->_buffer_size( $self->_buffer_size + $size );
                $self->_buffer_count( $self->_buffer_count + $count );
            }
            die $error;
        }
        );
    $promise->then( sub { $self->_report( \@items, @_ ) },
        sub { $self->on_fatal(@_) } );
    return $promise;
}