sub sniff()

in lib/Search/Elasticsearch/CxnPool/Async/Sniff.pm [67:135]


sub sniff {
#===================================
    my $self = shift;

    my $promise;
    if ( $promise = $self->_current_sniff ) {
        return $promise;
    }

    my $deferred   = deferred;
    my $cxns       = $self->cxns;
    my $total      = @$cxns;
    my $done       = 0;
    my $current    = 0;
    my $done_seeds = 0;
    $promise = $self->_current_sniff( $deferred->promise );

    my ( @all, @skipped );

    while ( 0 < $total-- ) {
        my $cxn = $cxns->[ $self->next_cxn_num ];
        if ( $cxn->is_dead ) {
            push @skipped, $cxn;
        }
        else {
            push @all, $cxn;
        }
    }

    push @all, @skipped;
    unless (@all) {
        @all = $self->_seeds_as_cxns;
        $done_seeds++;
    }

    my ( $weak_check_sniff, $cxn );
    my $check_sniff = sub {

        return if $done;
        my ( $cxn, $nodes ) = @_;
        if ( $nodes && $self->parse_sniff($nodes) ) {
            $done++;
            $self->_clear_sniff;
            return $deferred->resolve();
        }

        unless ( @all || $done_seeds++ ) {
            $self->logger->info("No live nodes available. Trying seed nodes.");
            @all = $self->_seeds_as_cxns;
        }

        if ( my $cxn = shift @all ) {
            return $cxn->sniff->done($weak_check_sniff);
        }
        if ( --$current == 0 ) {
            $self->_clear_sniff;
            $deferred->resolve();
        }
    };
    weaken( $weak_check_sniff = $check_sniff );

    for ( 1 .. $self->concurrent_sniff ) {
        my $cxn = shift(@all) || last;
        $current++;
        $cxn->sniff->done($check_sniff);
    }

    return $promise;
}