in lib/Search/Elasticsearch/CxnPool/Async/Sniff.pm [67:135]
sub sniff {
#===================================
my $self = shift;
my $promise;
if ( $promise = $self->_current_sniff ) {
return $promise;
}
my $deferred = deferred;
my $cxns = $self->cxns;
my $total = @$cxns;
my $done = 0;
my $current = 0;
my $done_seeds = 0;
$promise = $self->_current_sniff( $deferred->promise );
my ( @all, @skipped );
while ( 0 < $total-- ) {
my $cxn = $cxns->[ $self->next_cxn_num ];
if ( $cxn->is_dead ) {
push @skipped, $cxn;
}
else {
push @all, $cxn;
}
}
push @all, @skipped;
unless (@all) {
@all = $self->_seeds_as_cxns;
$done_seeds++;
}
my ( $weak_check_sniff, $cxn );
my $check_sniff = sub {
return if $done;
my ( $cxn, $nodes ) = @_;
if ( $nodes && $self->parse_sniff($nodes) ) {
$done++;
$self->_clear_sniff;
return $deferred->resolve();
}
unless ( @all || $done_seeds++ ) {
$self->logger->info("No live nodes available. Trying seed nodes.");
@all = $self->_seeds_as_cxns;
}
if ( my $cxn = shift @all ) {
return $cxn->sniff->done($weak_check_sniff);
}
if ( --$current == 0 ) {
$self->_clear_sniff;
$deferred->resolve();
}
};
weaken( $weak_check_sniff = $check_sniff );
for ( 1 .. $self->concurrent_sniff ) {
my $cxn = shift(@all) || last;
$current++;
$cxn->sniff->done($check_sniff);
}
return $promise;
}