lib/Search/Elasticsearch/Client/7_0/Async/Scroll.pm [37:220]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
sub BUILDARGS {
#===================================
    my ( $class, $search_params ) = parse_params(@_);

        my %params;
    for (qw(es on_start on_result on_results on_error)) {
        my $val = delete $search_params->{$_};
        next unless defined $val;
        $params{$_} = $val;
    }

    $params{scroll} = $search_params->{scroll} ||= '1m';
    $params{search_params} = $search_params;

    if ( $params{on_result} ) {
        $params{on_results}    = delete $params{on_result};
        $params{one_at_a_time} = 1;
    }
    elsif ( !$params{on_results} ) {
        throw( 'Param', 'Missing required param: on_results or on_result' );
    }
    return \%params;
}

#===================================
sub _build_on_error {
#===================================
    sub { warn "Scroll error: @_"; die @_ }
}

#===================================
sub start {
#===================================
    my $self = shift;
    $self->_set__guard($self);

    $self->es->search( $self->search_params )->then(
        sub {
            $self->_first_results(@_);
        }
        )->then(
        sub {
            $self->_fetch_loop;
        }
        )->catch(
        sub {
            $self->on_error->(@_);
            @_;
        }
        )->finally(
        sub {
            $self->finish;
            $self->_clear__guard;
        }
        );
}

#===================================
sub _first_results {
#===================================
    my ( $self, $results ) = @_;

    my $total = $results->{hits}{total};
    if (ref $total) {
        $total = $total->{value};
    }
    $self->_set_total($total);
    $self->_set_max_score( $results->{hits}{max_score} );
    $self->_set_aggregations( $results->{aggregations} );
    $self->_set_facets( $results->{facets} );
    $self->_set_suggest( $results->{suggest} );
    $self->_set_took( $results->{took} );
    $self->_set_total_took( $results->{took} );

    if ($total) {
        $self->_set__scroll_id( $results->{_scroll_id} );
    }
    else {
        $self->finish;
    }

    $self->on_start && $self->on_start->($self);

    my $hits = $results->{hits}{hits};
    return unless @$hits;
    return $self->_push_results($hits);
}

#===================================
sub _next_results {
#===================================
    my ( $self, $results ) = @_;
    $self->_set__scroll_id( $results->{_scroll_id} );
    $self->_set_total_took( $self->total_took + $results->{took} );

    my $hits = $results->{hits}{hits};
    return $self->finish
        unless @$hits;
    $self->_push_results($hits);
}

#===================================
sub _fetch_loop {
#===================================
    my $self = shift;
    my $d    = deferred;

    my $weak_loop;
    my $loop = sub {
        if ( $self->is_finished ) {
            return $d->resolve;
        }
        $self->scroll_request->then( sub { $self->_next_results(@_) } )
            ->done( $weak_loop, sub { $d->reject(@_) } );
    };
    weaken( $weak_loop = $loop );
    $loop->();
    return $d->promise;
}

#===================================
sub _push_results {
#===================================
    my $self       = shift;
    my $it         = $self->_results_iterator(@_);
    my $on_results = $self->on_results;

    my $deferred = deferred;

    my $weak_process;
    my $process = sub {
        while ( !$self->is_finished ) {
            my @results  = $it->() or last;
            my @response = $on_results->(@results);
            my $promise  = thenable(@response) or next;
            return $promise->done( $weak_process,
                sub { $deferred->reject(@_) } );
        }
        $deferred->resolve;
    };
    weaken( $weak_process = $process );
    $process->();
    return $deferred->promise;
}

#===================================
sub _results_iterator {
#===================================
    my $self    = shift;
    my @results = @{ shift() };

    $self->one_at_a_time
        ? sub { splice @results, 0, 1 }
        : sub { splice @results };
}

#===================================
sub finish {
#===================================
    my $self = shift;
    $self->_set_is_finished(1);

    my $scroll_id = $self->_scroll_id;
    $self->_clear_scroll_id;

    if ( !$scroll_id || $self->_pid != $$ ) {
        my $d = deferred;
        $d->resolve();
        return $d->promise;
    }

    my %args = ( body => { scroll_id => $scroll_id } );

    $self->es->clear_scroll(%args)->then(
        sub {
            $self->_clear_on_start;
            $self->_clear_on_results;
            $self->_clear_on_error;
        },
        sub { }
    );
}

1;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



lib/Search/Elasticsearch/Client/8_0/Async/Scroll.pm [37:220]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
sub BUILDARGS {
#===================================
    my ( $class, $search_params ) = parse_params(@_);

        my %params;
    for (qw(es on_start on_result on_results on_error)) {
        my $val = delete $search_params->{$_};
        next unless defined $val;
        $params{$_} = $val;
    }

    $params{scroll} = $search_params->{scroll} ||= '1m';
    $params{search_params} = $search_params;

    if ( $params{on_result} ) {
        $params{on_results}    = delete $params{on_result};
        $params{one_at_a_time} = 1;
    }
    elsif ( !$params{on_results} ) {
        throw( 'Param', 'Missing required param: on_results or on_result' );
    }
    return \%params;
}

#===================================
sub _build_on_error {
#===================================
    sub { warn "Scroll error: @_"; die @_ }
}

#===================================
sub start {
#===================================
    my $self = shift;
    $self->_set__guard($self);

    $self->es->search( $self->search_params )->then(
        sub {
            $self->_first_results(@_);
        }
        )->then(
        sub {
            $self->_fetch_loop;
        }
        )->catch(
        sub {
            $self->on_error->(@_);
            @_;
        }
        )->finally(
        sub {
            $self->finish;
            $self->_clear__guard;
        }
        );
}

#===================================
sub _first_results {
#===================================
    my ( $self, $results ) = @_;

    my $total = $results->{hits}{total};
    if (ref $total) {
        $total = $total->{value};
    }
    $self->_set_total($total);
    $self->_set_max_score( $results->{hits}{max_score} );
    $self->_set_aggregations( $results->{aggregations} );
    $self->_set_facets( $results->{facets} );
    $self->_set_suggest( $results->{suggest} );
    $self->_set_took( $results->{took} );
    $self->_set_total_took( $results->{took} );

    if ($total) {
        $self->_set__scroll_id( $results->{_scroll_id} );
    }
    else {
        $self->finish;
    }

    $self->on_start && $self->on_start->($self);

    my $hits = $results->{hits}{hits};
    return unless @$hits;
    return $self->_push_results($hits);
}

#===================================
sub _next_results {
#===================================
    my ( $self, $results ) = @_;
    $self->_set__scroll_id( $results->{_scroll_id} );
    $self->_set_total_took( $self->total_took + $results->{took} );

    my $hits = $results->{hits}{hits};
    return $self->finish
        unless @$hits;
    $self->_push_results($hits);
}

#===================================
sub _fetch_loop {
#===================================
    my $self = shift;
    my $d    = deferred;

    my $weak_loop;
    my $loop = sub {
        if ( $self->is_finished ) {
            return $d->resolve;
        }
        $self->scroll_request->then( sub { $self->_next_results(@_) } )
            ->done( $weak_loop, sub { $d->reject(@_) } );
    };
    weaken( $weak_loop = $loop );
    $loop->();
    return $d->promise;
}

#===================================
sub _push_results {
#===================================
    my $self       = shift;
    my $it         = $self->_results_iterator(@_);
    my $on_results = $self->on_results;

    my $deferred = deferred;

    my $weak_process;
    my $process = sub {
        while ( !$self->is_finished ) {
            my @results  = $it->() or last;
            my @response = $on_results->(@results);
            my $promise  = thenable(@response) or next;
            return $promise->done( $weak_process,
                sub { $deferred->reject(@_) } );
        }
        $deferred->resolve;
    };
    weaken( $weak_process = $process );
    $process->();
    return $deferred->promise;
}

#===================================
sub _results_iterator {
#===================================
    my $self    = shift;
    my @results = @{ shift() };

    $self->one_at_a_time
        ? sub { splice @results, 0, 1 }
        : sub { splice @results };
}

#===================================
sub finish {
#===================================
    my $self = shift;
    $self->_set_is_finished(1);

    my $scroll_id = $self->_scroll_id;
    $self->_clear_scroll_id;

    if ( !$scroll_id || $self->_pid != $$ ) {
        my $d = deferred;
        $d->resolve();
        return $d->promise;
    }

    my %args = ( body => { scroll_id => $scroll_id } );

    $self->es->clear_scroll(%args)->then(
        sub {
            $self->_clear_on_start;
            $self->_clear_on_results;
            $self->_clear_on_error;
        },
        sub { }
    );
}

1;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



