in lib/Search/Elasticsearch/Client/7_0/Async/Bulk.pm [93:129]
sub flush {
#===================================
my $self = shift;
my $size = $self->_buffer_size;
my $count = $self->_buffer_count;
$self->_last_flush(time);
unless ($size) {
return deferred->resolve( { items => [] } )->promise;
}
my @items = ( @{ $self->_buffer } );
$self->clear_buffer;
if ( $self->verbose ) {
local $| = 1;
print ".";
}
my $promise
= $self->es->bulk( %{ $self->_bulk_args }, body => \@items )->catch(
sub {
my $error = shift;
if ( $error->is( 'Cxn', 'NoNodes' ) ) {
push @{ $self->_buffer }, @items;
$self->_buffer_size( $self->_buffer_size + $size );
$self->_buffer_count( $self->_buffer_count + $count );
}
die $error;
}
);
$promise->then( sub { $self->_report( \@items, @_ ) },
sub { $self->on_fatal(@_) } );
return $promise;
}