lib/Search/Elasticsearch/Cxn/Mojo.pm (74 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. package Search::Elasticsearch::Cxn::Mojo; use Mojo::UserAgent(); use Promises qw(deferred); use Try::Tiny; use Moo; with 'Search::Elasticsearch::Role::Cxn::Async'; with 'Search::Elasticsearch::Role::Cxn', 'Search::Elasticsearch::Role::Is_Async'; has 'connect_timeout' => ( is => 'ro', default => 2 ); use namespace::clean; #=================================== sub perform_request { #=================================== my ( $self, $params ) = @_; my $uri = $self->build_uri($params) . ''; my $method = $params->{method}; my %headers = ( %{ $self->default_headers } ); my @args = ( $method, $uri, \%headers ); my $data = $params->{data}; if ( defined $data ) { $headers{'Content-Type'} = $params->{mime_type}; $headers{'Content-Encoding'} = $params->{encoding} if $params->{encoding}; push @args, $data; } my $handle = $self->handle; $handle->connect_timeout( $self->connect_timeout ); $handle->request_timeout( $params->{timeout} || $self->request_timeout ); my $tx = $handle->build_tx(@args); my $deferred = deferred; $tx = $handle->start( $tx, sub { my ( $ua, $tx ) = @_; my $res = $tx->res; my $error; if ( $error = $res->error ) { $error = $error->{message} if ref $error eq 'HASH'; } my $headers = $res->headers->to_hash; $headers->{ lc($_) } = delete $headers->{$_} for keys %{$headers}; try { my ( $code, $response ) = $self->process_response( $params, # request ( $res->code || 500 ), # status $error, # reason $res->body, # content $headers, # headers ); $deferred->resolve( $code, $response ); } catch { $deferred->reject($_); }; } ); $deferred->promise; } #=================================== sub error_from_text { #=================================== local $_ = $_[2]; return /[Tt]imed out/ ? 'Timeout' : /SSL connect attempt failed/ ? 'SSL' : /Invalid argument/ ? 'Cxn' : 'Request'; } #=================================== sub _build_handle { #=================================== my $self = shift; my %args = %{ $self->handle_args }; if ( $self->is_https && $self->has_ssl_options ) { %args = ( %args, %{ $self->ssl_options } ); } return Mojo::UserAgent->new(%args); } 1; # ABSTRACT: An async Cxn implementation which uses Mojo::UserAgent =head1 DESCRIPTION Provides an async HTTP Cxn class based on L<Mojo::UserAgent>. The Mojo backend is fast, uses pure Perl, support proxies and https and provides persistent connections. This class does L<Search::Elasticsearch::Role::Cxn>, whose documentation provides more information, L<Search::Elasticsearch::Role::Async::Cxn>, and L<Search::Elasticsearch::Role::Is_Async>. =head1 CONFIGURATION =head2 C<connect_timeout> Unlike most HTTP backends, L<Mojo::UserAgent> accepts a separate C<connect_timeout> parameter, which defaults to C<2> seconds but can be reduced in an environment with low network latency. =head2 Inherited configuration From L<Search::Elasticsearch::Role::Cxn> =over =item * L<node|Search::Elasticsearch::Role::Cxn/"node"> =item * L<max_content_length|Search::Elasticsearch::Role::Cxn/"max_content_length"> =item * L<deflate|Search::Elasticsearch::Role::Cxn/"gzip"> =item * L<deflate|Search::Elasticsearch::Role::Cxn/"deflate"> =item * L<request_timeout|Search::Elasticsearch::Role::Cxn/"request_timeout"> =item * L<ping_timeout|Search::Elasticsearch::Role::Cxn/"ping_timeout"> =item * L<dead_timeout|Search::Elasticsearch::Role::Cxn/"dead_timeout"> =item * L<max_dead_timeout|Search::Elasticsearch::Role::Cxn/"max_dead_timeout"> =item * L<sniff_request_timeout|Search::Elasticsearch::Role::Cxn/"sniff_request_timeout"> =item * L<sniff_timeout|Search::Elasticsearch::Role::Cxn/"sniff_timeout"> =item * L<handle_args|Search::Elasticsearch::Role::Cxn/"handle_args"> =item * L<handle_args|Search::Elasticsearch::Role::Cxn/"default_qs_params"> =back =head1 SSL/TLS L<Search::Elasticsearch::Cxn::Mojo> does no validation of the remote host by default. This behaviour can be changed by passing the C<ssl_options> parameter with the C<ca>, C<cert>, and C<key> options. For instance, to check that the remote host has a trusted certificate, and to avoid man-in-the-middle attacks, you could do the following: use Search::Elasticsearch::Async; my $es = Search::Elasticsearch::Async->new( cxn => 'Mojo', nodes => [ "https://node1.mydomain.com:9200", "https://node2.mydomain.com:9200", ], ssl_options => { ca => '/path/to/cacert.pem' } ); If the remote server cannot be verified, an L<Search::Elasticsearch::Error|SSL error> will be thrown. If you want your client to present its own certificate to the remote server, then use: use Search::Elasticsearch::Async; my $es = Search::Elasticsearch::Async->new( cxn => 'Mojo', nodes => [ "https://node1.mydomain.com:9200", "https://node2.mydomain.com:9200", ], ssl_options => { ca => '/path/to/cacert.pem' cert => '/path/to/client.pem', key => '/path/to/client.pem' } ); =head1 METHODS =head2 C<perform_request()> $self->perform_request({ # required method => 'GET|HEAD|POST|PUT|DELETE', path => '/path/of/request', qs => \%query_string_params, # optional data => $body_as_string, mime_type => 'application/json', timeout => $timeout }) ->then(sub { my ($status,body) = @_; ...}) Sends the request to the associated Elasticsearch node and returns a C<$status> code and the decoded response C<$body>, or throws an error if the request failed. =head2 Inherited methods From L<Search::Elasticsearch::Role::Cxn> =over =item * L<scheme()|Search::Elasticsearch::Role::Cxn/"scheme()"> =item * L<is_https()|Search::Elasticsearch::Role::Cxn/"is_https()"> =item * L<userinfo()|Search::Elasticsearch::Role::Cxn/"userinfo()"> =item * L<default_headers()|Search::Elasticsearch::Role::Cxn/"default_headers()"> =item * L<max_content_length()|Search::Elasticsearch::Role::Cxn/"max_content_length()"> =item * L<build_uri()|Search::Elasticsearch::Role::Cxn/"build_uri()"> =item * L<host()|Search::Elasticsearch::Role::Cxn/"host()"> =item * L<port()|Search::Elasticsearch::Role::Cxn/"port()"> =item * L<uri()|Search::Elasticsearch::Role::Cxn/"uri()"> =item * L<is_dead()|Search::Elasticsearch::Role::Cxn/"is_dead()"> =item * L<is_live()|Search::Elasticsearch::Role::Cxn/"is_live()"> =item * L<next_ping()|Search::Elasticsearch::Role::Cxn/"next_ping()"> =item * L<ping_failures()|Search::Elasticsearch::Role::Cxn/"ping_failures()"> =item * L<mark_dead()|Search::Elasticsearch::Role::Cxn/"mark_dead()"> =item * L<mark_live()|Search::Elasticsearch::Role::Cxn/"mark_live()"> =item * L<force_ping()|Search::Elasticsearch::Role::Cxn/"force_ping()"> =item * L<pings_ok()|Search::Elasticsearch::Role::Cxn/"pings_ok()"> =item * L<sniff()|Search::Elasticsearch::Role::Cxn/"sniff()"> =item * L<process_response()|Search::Elasticsearch::Role::Cxn/"process_response()"> =back From L<Search::Elasticsearch::Role::Async::Cxn> =over =item * L<pings_ok()|Search::Elasticsearch::Role::Cxn/"pings_ok()"> =item * L<sniff()|Search::Elasticsearch::Role::Cxn/"sniff()"> =back =head1 SEE ALSO =over =item * L<Search::Elasticsearch::Role::Cxn::AEHTTP> =back