# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

package Search::Elasticsearch::CxnPool::Async::Sniff;

use Moo;
with 'Search::Elasticsearch::Role::CxnPool::Sniff',
    'Search::Elasticsearch::Role::Is_Async';

use Scalar::Util qw(weaken);
use Promises qw(deferred);
use Search::Elasticsearch::Util qw(new_error);

use namespace::clean;
has 'concurrent_sniff' => ( is => 'rw', default => 4 );
has '_current_sniff'   => ( is => 'rw', clearer => '_clear_sniff' );

#===================================
sub next_cxn {
#===================================
    my ( $self, $no_sniff ) = @_;

    return $self->sniff->then( sub { $self->next_cxn('no_sniff') } )
        if $self->next_sniff <= time() && !$no_sniff;

    my $cxns  = $self->cxns;
    my $total = @$cxns;
    my $cxn;

    while ( 0 < $total-- ) {
        $cxn = $cxns->[ $self->next_cxn_num ];
        last if $cxn->is_live;
        undef $cxn;
    }

    my $deferred = deferred;

    if ($cxn) {
        $deferred->resolve($cxn);
    }
    else {
        $deferred->reject(
            new_error(
                "NoNodes",
                "No nodes are available: [" . $self->cxns_seeds_str . ']'
            )
        );
    }
    return $deferred->promise;
}

#===================================
sub sniff {
#===================================
    my $self = shift;

    my $promise;
    if ( $promise = $self->_current_sniff ) {
        return $promise;
    }

    my $deferred   = deferred;
    my $cxns       = $self->cxns;
    my $total      = @$cxns;
    my $done       = 0;
    my $current    = 0;
    my $done_seeds = 0;
    $promise = $self->_current_sniff( $deferred->promise );

    my ( @all, @skipped );

    while ( 0 < $total-- ) {
        my $cxn = $cxns->[ $self->next_cxn_num ];
        if ( $cxn->is_dead ) {
            push @skipped, $cxn;
        }
        else {
            push @all, $cxn;
        }
    }

    push @all, @skipped;
    unless (@all) {
        @all = $self->_seeds_as_cxns;
        $done_seeds++;
    }

    my ( $weak_check_sniff, $cxn );
    my $check_sniff = sub {

        return if $done;
        my ( $cxn, $nodes ) = @_;
        if ( $nodes && $self->parse_sniff($nodes) ) {
            $done++;
            $self->_clear_sniff;
            return $deferred->resolve();
        }

        unless ( @all || $done_seeds++ ) {
            $self->logger->info("No live nodes available. Trying seed nodes.");
            @all = $self->_seeds_as_cxns;
        }

        if ( my $cxn = shift @all ) {
            return $cxn->sniff->done($weak_check_sniff);
        }
        if ( --$current == 0 ) {
            $self->_clear_sniff;
            $deferred->resolve();
        }
    };
    weaken( $weak_check_sniff = $check_sniff );

    for ( 1 .. $self->concurrent_sniff ) {
        my $cxn = shift(@all) || last;
        $current++;
        $cxn->sniff->done($check_sniff);
    }

    return $promise;
}

#===================================
sub _seeds_as_cxns {
#===================================
    my $self    = shift;
    my $factory = $self->cxn_factory;
    return map { $factory->new_cxn($_) } @{ $self->seed_nodes };
}

1;

# ABSTRACT: An async CxnPool for connecting to a local cluster with a dynamic node list

=head1 SYNOPSIS


    $e = Search::Elasticsearch::Async->new(
        cxn_pool => 'Async::Sniff',
        nodes    => [
            'search1:9200',
            'search2:9200'
        ],
    );

=head1 DESCRIPTION

The L<Async::Sniff|Search::Elasticsearch::CxnPool::Async::Sniff> connection
pool should be used when you B<do> have direct access to the Elasticsearch
cluster, eg when your web servers and Elasticsearch servers are on the same
network. The nodes that you specify are used to I<discover> the cluster,
which is then I<sniffed> to find the current list of live nodes that the
cluster knows about.

This sniff process is repeated regularly, or whenever a node fails,
to update the list of healthy nodes.  So if you add more nodes to your
cluster, they will be auto-discovered during a sniff.

If all sniffed nodes fail, then it falls back to sniffing the original
I<seed> nodes that you specified in C<new()>.

For L<HTTP Cxn classes|Search::Elasticsearch::Role::Cxn>, this module
will also dynamically detect the C<max_content_length> which the nodes
in the cluster will accept.

This class does L<Search::Elasticsearch::Role::CxnPool::Sniff> and
L<Search::Elasticsearch::Role::Is_Async>.

=head1 CONFIGURATION

=head2 C<nodes>

The list of nodes to use to discover the cluster.  Can accept a single node,
multiple nodes, and defaults to C<localhost:9200> if no C<nodes> are
specified. See L<Search::Elasticsearch::Role::Cxn/node> for details of the node
specification.

=head2 C<concurrent_sniff>

By default, this module will issue up to 4 concurrent sniff requests in parallel,
depending on how many nodes are known.  The first successful response is used
to set the new list of live nodes.  Set C<concurrent_sniff> to change the
maximum number of concurrent sniff requests.

=head2 See also

=over

=item *

L<Search::Elasticsearch::Role::Cxn/request_timeout>

=item *

L<Search::Elasticsearch::Role::Cxn/sniff_timeout>

=item *

L<Search::Elasticsearch::Role::Cxn/sniff_request_timeout>

=back

=head2 Inherited configuration

From L<Search::Elasticsearch::Role::CxnPool::Sniff>

=over

=item * L<sniff_interval|Search::Elasticsearch::Role::CxnPool::Sniff/"sniff_interval">

=item * L<sniff_max_content_length|Search::Elasticsearch::Role::CxnPool::Sniff/"sniff_max_content_length">

=back

From L<Search::Elasticsearch::Role::CxnPool>

=over

=item * L<randomize_cxns|Search::Elasticsearch::Role::CxnPool/"randomize_cxns">

=back

=head1 METHODS

=head2 C<next_cxn()>

    $cxn_pool->next_cxn
             -> then( sub { my $cxn = shift })

Returns the next available live node (in round robin fashion), or
throws a C<NoNodes> error if no nodes can be sniffed from the cluster.

=head2 C<sniff()>

    $cxn_pool->sniff->then(
        sub { "ok"     },
        sub { "not_ok" }
    );

Sniffs the cluster and returns a promise which is resolved on success, or
rejected on failure.

=head2 Inherited methods

From L<Search::Elasticsearch::Role::CxnPool::Sniff>

=over

=item * L<schedule_check()|Search::Elasticsearch::Role::CxnPool::Sniff/"schedule_check()">

=item * L<parse_sniff()|Search::Elasticsearch::Role::CxnPool::Sniff/"parse_sniff()">

=item * L<should_accept_node()|Search::Elasticsearch::Role::CxnPool::Sniff/"should_accept_node()">

=back

From L<Search::Elasticsearch::Role::CxnPool>

=over

=item * L<cxn_factory()|Search::Elasticsearch::Role::CxnPool/"cxn_factory()">

=item * L<logger()|Search::Elasticsearch::Role::CxnPool/"logger()">

=item * L<serializer()|Search::Elasticsearch::Role::CxnPool/"serializer()">

=item * L<current_cxn_num()|Search::Elasticsearch::Role::CxnPool/"current_cxn_num()">

=item * L<cxns()|Search::Elasticsearch::Role::CxnPool/"cxns()">

=item * L<seed_nodes()|Search::Elasticsearch::Role::CxnPool/"seed_nodes()">

=item * L<next_cxn_num()|Search::Elasticsearch::Role::CxnPool/"next_cxn_num()">

=item * L<set_cxns()|Search::Elasticsearch::Role::CxnPool/"set_cxns()">

=item * L<request_ok()|Search::Elasticsearch::Role::CxnPool/"request_ok()">

=item * L<request_failed()|Search::Elasticsearch::Role::CxnPool/"request_failed()">

=item * L<should_retry()|Search::Elasticsearch::Role::CxnPool/"should_retry()">

=item * L<should_mark_dead()|Search::Elasticsearch::Role::CxnPool/"should_mark_dead()">

=item * L<cxns_str()|Search::Elasticsearch::Role::CxnPool/"cxns_str()">

=item * L<cxns_seeds_str()|Search::Elasticsearch::Role::CxnPool/"cxns_seeds_str()">

=item * L<retries()|Search::Elasticsearch::Role::CxnPool/"retries()">

=item * L<reset_retries()|Search::Elasticsearch::Role::CxnPool/"reset_retries()">

=back

