lib/Search/Elasticsearch/Role/CxnPool.pm (87 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
package Search::Elasticsearch::Role::CxnPool;
use Moo::Role;
use Search::Elasticsearch::Util qw(parse_params);
use List::Util qw(shuffle);
use IO::Select();
use Time::HiRes qw(time sleep);
use Search::Elasticsearch::Util qw(to_list);
use namespace::clean;
requires qw(next_cxn schedule_check);
has 'cxn_factory' => ( is => 'ro', required => 1 );
has 'logger' => ( is => 'ro', required => 1 );
has 'serializer' => ( is => 'ro', required => 1 );
has 'current_cxn_num' => ( is => 'rwp', default => 0 );
has 'cxns' => ( is => 'rwp', default => sub { [] } );
has 'seed_nodes' => ( is => 'ro', required => 1 );
has 'retries' => ( is => 'rw', default => 0 );
has 'randomize_cxns' => ( is => 'ro', default => 1 );
#===================================
around BUILDARGS => sub {
#===================================
my $orig = shift;
my $params = $orig->(@_);
my @seed = grep {$_} to_list( delete $params->{nodes} || ('') );
@seed = $params->{cxn_factory}->default_host
unless @seed;
$params->{seed_nodes} = \@seed;
return $params;
};
#===================================
sub next_cxn_num {
#===================================
my $self = shift;
my $cxns = $self->cxns;
return unless @$cxns;
my $current = $self->current_cxn_num;
$self->_set_current_cxn_num( ( $current + 1 ) % @$cxns );
return $current;
}
#===================================
sub set_cxns {
#===================================
my $self = shift;
my $factory = $self->cxn_factory;
my @cxns = map { $factory->new_cxn($_) } @_;
@cxns = shuffle @cxns if $self->randomize_cxns;
$self->_set_cxns( \@cxns );
$self->_set_current_cxn_num(0);
$self->logger->infof( "Current cxns: %s",
[ map { $_->stringify } @cxns ] );
return;
}
#===================================
sub request_ok {
#===================================
my ( $self, $cxn ) = @_;
$cxn->mark_live;
$self->reset_retries;
}
#===================================
sub request_failed {
#===================================
my ( $self, $cxn, $error ) = @_;
if ( $error->is( 'Cxn', 'Timeout' ) ) {
$cxn->mark_dead if $self->should_mark_dead($error);
$self->schedule_check;
if ( $self->should_retry($error) ) {
my $retries = $self->retries( $self->retries + 1 );
return 1 if $retries < $self->_max_retries;
}
}
else {
$cxn->mark_live if $cxn;
}
$self->reset_retries;
return 0;
}
#===================================
sub should_retry {
#===================================
my ( $self, $error ) = @_;
return $error->is('Cxn');
}
#===================================
sub should_mark_dead {
#===================================
my ( $self, $error ) = @_;
return $error->is('Cxn');
}
#===================================
sub cxns_str {
#===================================
my $self = shift;
join ", ", map { $_->stringify } @{ $self->cxns };
}
#===================================
sub cxns_seeds_str {
#===================================
my $self = shift;
join ", ", ( map { $_->stringify } @{ $self->cxns } ),
@{ $self->seed_nodes };
}
#===================================
sub reset_retries { shift->retries(0) }
sub _max_retries {2}
#===================================
1;
__END__
#ABSTRACT: Provides common functionality to the CxnPool implementations
=head1 DESCRIPTION
See the CxnPool implementations:
=over
=item *
L<Search::Elasticsearch::CxnPool::Static>
=item *
L<Search::Elasticsearch::CxnPool::Sniff>
=item *
L<Search::Elasticsearch::CxnPool::Static::NoPing>
=back
=head1 CONFIGURATION
These configuration options should not be set by the user but are
documented here for completeness.
=head2 C<randomize_cxns>
By default, the order of cxns passed to L</set_cxns()> is randomized
before they are stored. Set C<randomize_cxns> to a false value to
disable.
=head1 METHODS
=head2 C<cxn_factory()>
$factory = $cxn_pool->cxn_factory
Returns the L<Search::Elasticsearch::Cxn::Factory> object for creating a new
C<$cxn> instance.
=head2 C<logger()>
$logger = $cxn_pool->logger
Returns the L<Search::Elasticsearch::Role::Logger>-based object, which
defaults to L<Search::Elasticsearch::Logger::LogAny>.
=head2 C<serializer()>
$serializer = $cxn_pool->serializer
Returns the L<Search::Elasticsearch::Role::Serializer>-based object,
which defaults to L<Search::Elasticsearch::Serializer::JSON>.
=head2 C<current_cxn_num()>
$num = $cxn_pool->current_cxn_num
Returns the current cxn number, which is an offset into
the array of cxns set by L</set_cxns()>.
=head2 C<cxns()>
\@cxns = $cxn_pool->cxns;
Returns the current list of L<Search::Elasticsearch::Role::Cxn>-based
cxn objects as set by L</set_cxns()>.
=head2 C<seed_nodes()>
\@seed_nodes = $cxn_pool->seed_nodes
Returns the list of C<nodes> originally specified when calling
L<Search::Elasticsearch/new()>.
=head2 C<next_cxn_num()>
$num = $cxn_pool->next_cxn_num;
Returns the number of the next connection, in round-robin fashion. Updates
the L</current_cxn_num()>.
=head2 C<set_cxns()>
$cxn_pool->set_cxns(@nodes);
Takes a list of nodes, converts them into L<Search::Elasticsearch::Role::Cxn>-based
objects and makes them accessible via L</cxns()>.
=head2 C<request_ok()>
$cxn_pool->request_ok($cxn);
Called when a request by the specified C<$cxn> object has completed successfully.
Marks the C<$cxn> as live.
=head2 C<request_failed()>
$should_retry = $cxn_pool->request_failed($cxn,$error);
Called when a request by the specified C<$cxn> object has failed. Returns
C<1> if the request should be retried or C<0> if it shouldn't.
=head2 C<should_retry()>
$bool = $cxn_pool->should_retry($error);
Examines the error to decide whether the request should be retried or not.
By default, only L<Search::Elasticsearch::Error/Search::Elasticsearch::Error::Cxn> errors
are retried.
=head2 C<should_mark_dead()>
$bool = $cxn_pool->should_mark_dead($error);
Examines the error to decide whether the C<$cxn> should be marked as dead or not.
By default, only L<Search::Elasticsearch::Error/Search::Elasticsearch::Error::Cxn> errors
cause a C<$cxn> to be marked as dead.
=head2 C<cxns_str()>
$str = $cxn_pool->cxns_str
Returns all L</cxns()> as a string for logging purposes.
=head2 C<cxns_seeds_str()>
$str = $cxn_pool->cxns_seeeds_str
Returns all L</cxns()> and L</seed_nodes()> as a string for logging purposes.
=head2 C<retries()>
$retries = $cxn_pool->retries
The number of times the current request has been retried.
=head2 C<reset_retries()>
$cxn_pool->reset_retries;
Called at the start of a new request to reset the retries count.