lib/Search/Elasticsearch/Client/7_0/Async/Bulk.pm [22:131]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    'Search::Elasticsearch::Role::Is_Async';

use Search::Elasticsearch::Util qw(parse_params throw);
use Scalar::Util qw(weaken blessed);
use Promises qw(deferred);
use Try::Tiny;
use namespace::clean;

has 'on_fatal' => ( is => 'lazy' );

#===================================
sub _build_on_fatal {
#===================================
    my $self = shift;
    return sub {
        warn("Fatal bulk error: @_");
    };
}

#===================================
sub add_action {
#===================================
    my $self      = shift;
    my $buffer    = $self->_buffer;
    my $max_size  = $self->max_size;
    my $max_count = $self->max_count;
    my $max_time  = $self->max_time;

    my $deferred = deferred;
    my @actions  = @_;

    my $weak_add;
    my $add = sub {
        while (@actions) {
            my @json = try {
                $self->_encode_action( splice( @actions, 0, 2 ) );
            }
            catch {
                $self->on_fatal->($_);
                $deferred->reject($_);
                ();
            };
            return unless @json;

            push @$buffer, @json;

            my $size = $self->_buffer_size;
            $size += length($_) + 1 for @json;
            $self->_buffer_size($size);

            my $count = $self->_buffer_count( $self->_buffer_count + 1 );

            next
                unless ( $max_size and $size >= $max_size )
                || ( $max_count and $count >= $max_count )
                || ( $max_time  and time >= $self->_last_flush + $max_time );

            return $self->flush->done( $weak_add,
                sub { $deferred->reject(@_) } );
        }
        return $deferred->resolve;

    };

    weaken( $weak_add = $add );
    $add->();
    return $deferred->promise;

}

#===================================
sub flush {
#===================================
    my $self = shift;

    my $size  = $self->_buffer_size;
    my $count = $self->_buffer_count;

    $self->_last_flush(time);

    unless ($size) {
        return deferred->resolve( { items => [] } )->promise;
    }

    my @items = ( @{ $self->_buffer } );
    $self->clear_buffer;

    if ( $self->verbose ) {
        local $| = 1;
        print ".";
    }

    my $promise
        = $self->es->bulk( %{ $self->_bulk_args }, body => \@items )->catch(
        sub {
            my $error = shift;
            if ( $error->is( 'Cxn', 'NoNodes' ) ) {
                push @{ $self->_buffer }, @items;
                $self->_buffer_size( $self->_buffer_size + $size );
                $self->_buffer_count( $self->_buffer_count + $count );
            }
            die $error;
        }
        );
    $promise->then( sub { $self->_report( \@items, @_ ) },
        sub { $self->on_fatal(@_) } );
    return $promise;
}

1;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



lib/Search/Elasticsearch/Client/8_0/Async/Bulk.pm [22:131]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    'Search::Elasticsearch::Role::Is_Async';

use Search::Elasticsearch::Util qw(parse_params throw);
use Scalar::Util qw(weaken blessed);
use Promises qw(deferred);
use Try::Tiny;
use namespace::clean;

has 'on_fatal' => ( is => 'lazy' );

#===================================
sub _build_on_fatal {
#===================================
    my $self = shift;
    return sub {
        warn("Fatal bulk error: @_");
    };
}

#===================================
sub add_action {
#===================================
    my $self      = shift;
    my $buffer    = $self->_buffer;
    my $max_size  = $self->max_size;
    my $max_count = $self->max_count;
    my $max_time  = $self->max_time;

    my $deferred = deferred;
    my @actions  = @_;

    my $weak_add;
    my $add = sub {
        while (@actions) {
            my @json = try {
                $self->_encode_action( splice( @actions, 0, 2 ) );
            }
            catch {
                $self->on_fatal->($_);
                $deferred->reject($_);
                ();
            };
            return unless @json;

            push @$buffer, @json;

            my $size = $self->_buffer_size;
            $size += length($_) + 1 for @json;
            $self->_buffer_size($size);

            my $count = $self->_buffer_count( $self->_buffer_count + 1 );

            next
                unless ( $max_size and $size >= $max_size )
                || ( $max_count and $count >= $max_count )
                || ( $max_time  and time >= $self->_last_flush + $max_time );

            return $self->flush->done( $weak_add,
                sub { $deferred->reject(@_) } );
        }
        return $deferred->resolve;

    };

    weaken( $weak_add = $add );
    $add->();
    return $deferred->promise;

}

#===================================
sub flush {
#===================================
    my $self = shift;

    my $size  = $self->_buffer_size;
    my $count = $self->_buffer_count;

    $self->_last_flush(time);

    unless ($size) {
        return deferred->resolve( { items => [] } )->promise;
    }

    my @items = ( @{ $self->_buffer } );
    $self->clear_buffer;

    if ( $self->verbose ) {
        local $| = 1;
        print ".";
    }

    my $promise
        = $self->es->bulk( %{ $self->_bulk_args }, body => \@items )->catch(
        sub {
            my $error = shift;
            if ( $error->is( 'Cxn', 'NoNodes' ) ) {
                push @{ $self->_buffer }, @items;
                $self->_buffer_size( $self->_buffer_size + $size );
                $self->_buffer_count( $self->_buffer_count + $count );
            }
            die $error;
        }
        );
    $promise->then( sub { $self->_report( \@items, @_ ) },
        sub { $self->on_fatal(@_) } );
    return $promise;
}

1;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



