in lib/Search/Elasticsearch/Client/7_0/Async/Bulk.pm [42:90]
sub add_action {
#===================================
my $self = shift;
my $buffer = $self->_buffer;
my $max_size = $self->max_size;
my $max_count = $self->max_count;
my $max_time = $self->max_time;
my $deferred = deferred;
my @actions = @_;
my $weak_add;
my $add = sub {
while (@actions) {
my @json = try {
$self->_encode_action( splice( @actions, 0, 2 ) );
}
catch {
$self->on_fatal->($_);
$deferred->reject($_);
();
};
return unless @json;
push @$buffer, @json;
my $size = $self->_buffer_size;
$size += length($_) + 1 for @json;
$self->_buffer_size($size);
my $count = $self->_buffer_count( $self->_buffer_count + 1 );
next
unless ( $max_size and $size >= $max_size )
|| ( $max_count and $count >= $max_count )
|| ( $max_time and time >= $self->_last_flush + $max_time );
return $self->flush->done( $weak_add,
sub { $deferred->reject(@_) } );
}
return $deferred->resolve;
};
weaken( $weak_add = $add );
$add->();
return $deferred->promise;
}