# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

package Avro::Schema;
use strict;
use warnings;

use Carp;
use JSON::MaybeXS ();
use Try::Tiny;

our $VERSION = '++MODULE_VERSION++';

my $json = JSON::MaybeXS->new->allow_nonref;

sub parse {
    my $schema      = shift;
    my $json_string = shift;
    my $names       = shift || {};
    my $namespace   = shift || "";

    my $struct = try {
        $json->decode($json_string);
    }
    catch {
        throw Avro::Schema::Error::Parse(
            "Cannot parse json string: $_"
        );
    };
    return $schema->parse_struct($struct, $names, $namespace);
}

sub to_string {
    my $class = shift;
    my $struct = shift;
    return $json->encode($struct);
}

sub parse_struct {
    my $schema = shift;
    my $struct = shift;
    my $names = shift || {};
    my $namespace = shift || "";

    ## 1.3.2 A JSON object
    if (ref $struct eq 'HASH') {
        my $type = $struct->{type}
            or throw Avro::Schema::Error::Parse("type is missing");
        if ( Avro::Schema::Primitive->is_type_valid($type) ) {
            return Avro::Schema::Primitive->new(type => $type);
        }
        ## XXX technically we shouldn't allow error type other than in
        ## a Protocol definition
        if ($type eq 'record' or $type eq 'error') {
            return Avro::Schema::Record->new(
                struct => $struct,
                names => $names,
                namespace => $namespace,
            );
        }
        elsif ($type eq 'enum') {
            return Avro::Schema::Enum->new(
                struct => $struct,
                names => $names,
                namespace => $namespace,
            );
        }
        elsif ($type eq 'array') {
            return Avro::Schema::Array->new(
                struct => $struct,
                names => $names,
                namespace => $namespace,
            );
        }
        elsif ($type eq 'map') {
            return Avro::Schema::Map->new(
                struct => $struct,
                names => $names,
                namespace => $namespace,
            );
        }
        elsif ($type eq 'fixed') {
            return Avro::Schema::Fixed->new(
                struct => $struct,
                names => $names,
                namespace => $namespace,
            );
        }
        else {
            throw Avro::Schema::Error::Parse("unknown type: $type");
        }
    }
    ## 1.3.2 A JSON array, representing a union of embedded types.
    elsif (ref $struct eq 'ARRAY') {
        return Avro::Schema::Union->new(
            struct => $struct,
            names => $names,
            namespace => $namespace,
        );
    }
    ## 1.3.2 A JSON string, naming a defined type.
    else {
        my $type = $struct;
        ## It's one of our custom defined type
        
        ## Short name provided, prepend the namespace
        if ( $type !~ /\./ ) {
            my $fulltype = $namespace . '.' . $type;
            if (exists $names->{$fulltype}) {
                return $names->{$fulltype};
            }
        }
        
        ## Fully-qualified name
        if (exists $names->{$type}) {
            return $names->{$type};
        }
        
        ## It's a primitive type
        return Avro::Schema::Primitive->new(type => $type);
    }
}

sub match {
    my $class = shift;
    my %param = @_;

    my $reader = $param{reader}
        or croak "missing reader schema";
    my $writer = $param{writer}
        or croak "missing writer schema";

    my $wtype = ref $writer ? $writer->type : $writer;
    my $rtype = ref $reader ? $reader->type : $reader;
    ## 1.3.2 either schema is a union
    return $wtype if $wtype eq 'union' or $rtype eq 'union';

    ## 1.3.2 both schemas have same primitive type
    return $wtype if $wtype eq $rtype
             && Avro::Schema::Primitive->is_type_valid($wtype);

    ## 1.3.2
    ## int is promotable to long, float, or double
    if ($wtype eq 'int' && (
        $rtype eq 'float' or $rtype eq 'long' or $rtype eq 'double'
    )) {
        return $rtype;
    }
    ## long is promotable to float or double
    if ($wtype eq 'long' && (
        $rtype eq 'float' or $rtype eq 'double'
    )) {
        return $rtype;
    }
    ## float is promotable to double
    if ($wtype eq 'float' && $rtype eq 'double') {
        return $rtype;
    }
    return 0 unless $rtype eq $wtype;

    ## 1.3.2 {subtype and/or names} match
    if ($rtype eq 'array') {
        return $wtype if $class->match(
            reader => $reader->items,
            writer => $writer->items,
        );
    }
    elsif ($rtype eq 'record') {
        return $wtype if $reader->fullname eq $writer->fullname;
    }
    elsif ($rtype eq 'map') {
        return $wtype if $class->match(
            reader => $reader->values,
            writer => $writer->values,
        );
    }
    elsif ($rtype eq 'fixed') {
        return $wtype if $reader->size     eq $writer->size
                      && $reader->fullname eq $writer->fullname;
    }
    elsif ($rtype eq 'enum') {
        return $wtype if $reader->fullname eq $writer->fullname;
    }
    return 0;
}


package Avro::Schema::Base;
our @ISA = qw/Avro::Schema/;
use Carp;

sub new {
    my $class = shift;
    my %param = @_;

    my $type = $param{type};
    if (!$type) {
        my ($t) = $class =~ /::([^:]+)$/;
        $type = lc ($t);
    }
    my $schema = bless {
        type => $type,
    }, $class;
    return $schema;
}

sub type {
    my $schema = shift;
    return $schema->{type};
}

sub to_string {
    my $schema = shift;
    my $known_names = shift || {};
    return Avro::Schema->to_string($schema->to_struct($known_names));
}

package Avro::Schema::Primitive;
our @ISA = qw/Avro::Schema::Base/;
use Carp;
use Config;
use Regexp::Common qw/number/;

my %PrimitiveType = map { $_ => 1 } qw/
    null
    boolean
    int
    long
    float
    double
    bytes
    string
/;

my %Singleton = ( );

## FIXME: useless lazy generation
sub new {
    my $class = shift;
    my %param = @_;

    my $type = $param{type}
        or croak "Schema must have a type";

    throw Avro::Schema::Error::Parse("Not a primitive type $type")
        unless $class->is_type_valid($type);

    if (! exists $Singleton{ $type } ) {
        my $schema = $class->SUPER::new( type => $type );
        $Singleton{ $type } = $schema;
    }
    return $Singleton{ $type };
}

sub is_type_valid {
    return $PrimitiveType{ $_[1] || "" };
}

## Returns true or false wheter the given data is valid for
## this schema
sub is_data_valid {
    my $schema = shift;
    my $data = shift;
    my $type = $schema->{type};

    if ($type eq 'null') {
        return ! defined $data;
    }

    # undef isn't valid for any other types
    return 0 unless defined $data;

    if ($type eq 'int') {
        no warnings;
        my $packed_int = pack "l", $data;
        my $unpacked_int = unpack "l", $packed_int;
        return $unpacked_int eq $data ? 1 : 0;
    }
    if ($type eq 'long') {
        if ($Config{use64bitint}) {
            return 0 unless defined $data;
            my $packed_int = pack "q", $data;
            my $unpacked_int = unpack "q", $packed_int;
            return $unpacked_int eq $data ? 1 : 0;

        }
        else {
            require Math::BigInt;
            my $int = eval { Math::BigInt->new($data) };
            if ($@) {
                warn "probably a unblessed ref: $@";
                return 0;
            }
            return 0 if $int->is_nan;
            my $max = Math::BigInt->new( "0x7FFF_FFFF_FFFF_FFFF" );
            return $int->bcmp($max) <= 0 ? 1 : 0;
        }
    }
    if ($type eq 'float' or $type eq 'double') {
        $data =~ /^$RE{num}{real}$/ ? return 1 : 0;
    }
    if ($type eq 'bytes') {
        return 0 if ref $data;
        return 1 unless utf8::is_utf8($data) and $data =~ /[^\x00-\xFF]/;
    }
    if ($type eq 'string') {
        return 1 unless ref $data;
    }
    if ($type eq 'boolean') {
        return 1 if JSON::PP::is_bool($data);
        return 0 if ref $data; # sometimes risky
        return $data =~ m{^(?:yes|no|y|n|t|f|true|false|0|1)$}i;
    }
    return 0;
}

sub to_struct {
    my $schema = shift;
    return $schema->type;
}

package Avro::Schema::Named;
our @ISA = qw/Avro::Schema::Base/;
use Scalar::Util;

my %NamedType = map { $_ => 1 } qw/
    record
    enum
    fixed
/;

sub new {
    my $class = shift;
    my %param = @_;

    my $schema = $class->SUPER::new(%param);

    my $names     = $param{names}  || {};
    my $struct    = $param{struct} || {};
    my $name      = $struct->{name};
    unless (defined $name && length $name) {
        throw Avro::Schema::Error::Parse( "Missing name for $class" );
    }
    my $namespace = $struct->{namespace};
    unless (defined $namespace && length $namespace) {
        $namespace = $param{namespace};
    }

    $schema->set_names($namespace, $name);
    $schema->add_name($names);

    return $schema;
}

sub is_type_valid {
    return $NamedType{ $_[1] || "" };
}

sub set_names {
    my $schema = shift;
    my ($namespace, $name) = @_;

    my @parts = split /\./, ($name || ""), -1;
    if (@parts > 1) {
        $name = pop @parts;
        $namespace = join ".", @parts;
        if (grep { ! length $_ } @parts) {
            throw Avro::Schema::Error::Name(
                "name '$name' is not a valid name"
            );
        }
    }

    ## 1.3.2 The name portion of a fullname, and record field names must:
    ## * start with [A-Za-z_]
    ## * subsequently contain only [A-Za-z0-9_]
    my $type = $schema->{type};
    unless (length $name && $name =~ m/^[A-Za-z_][A-Za-z0-9_]*$/) {
        throw Avro::Schema::Error::Name(
            "name '$name' is not valid for $type"
        );
    }
    if (defined $namespace && length $namespace) {
        for (split /\./, $namespace, -1) {
            unless ($_ && /^[A-Za-z_][A-Za-z0-9_]*$/) {
                throw Avro::Schema::Error::Name(
                    "namespace '$namespace' is not valid for $type"
                );
            }
        }
    }
    $schema->{name} = $name;
    $schema->{namespace} = $namespace;
}

sub add_name {
    my $schema = shift;
    my ($names) = @_;

    my $name = $schema->fullname;
    if ( exists $names->{ $name } ) {
        throw Avro::Schema::Error::Parse( "Name $name is already defined" );
    }
    $names->{$name} = $schema;
    Scalar::Util::weaken( $names->{$name} );
    return;
}

sub fullname {
    my $schema = shift;
    return join ".",
        grep { defined $_ && length $_ }
        map { $schema->{$_ } }
        qw/namespace name/;
}

sub namespace {
    my $schema = shift;
    return $schema->{namespace};
}

package Avro::Schema::Record;
our @ISA = qw/Avro::Schema::Named/;
use Scalar::Util;

my %ValidOrder = map { $_ => 1 } qw/ascending descending ignore/;

sub new {
    my $class = shift;
    my %param = @_;

    my $names  = $param{names} ||= {};
    my $schema = $class->SUPER::new(%param);

    my $fields = $param{struct}{fields}
        or throw Avro::Schema::Error::Parse("Record must have Fields");

    throw Avro::Schema::Error::Parse("Record.Fields must me an array")
        unless ref $fields eq 'ARRAY';

    my $namespace = $schema->namespace;

    my @fields;
    for my $field (@$fields) {
        my $f = Avro::Schema::Field->new($field, $names, $namespace);
        push @fields, $f;
    }
    $schema->{fields} = \@fields;
    return $schema;
}

sub to_struct {
    my $schema = shift;
    my $known_names = shift || {};
    ## consider that this record type is now known (will serialize differently)
    my $fullname = $schema->fullname;
    if ($known_names->{ $fullname }++) {
        return $fullname;
    }
    return {
        type => $schema->{type},
        name => $fullname,
        fields => [
            map { $_->to_struct($known_names) } @{ $schema->{fields} }
        ],
    };
}

sub fields {
    my $schema = shift;
    return $schema->{fields};
}

sub fields_as_hash {
    my $schema = shift;
    unless (exists $schema->{_fields_as_hash}) {
        $schema->{_fields_as_hash} = {
            map { $_->{name} => $_ } @{ $schema->{fields} }
        };
    }
    return $schema->{_fields_as_hash};
}

sub is_data_valid {
    my $schema = shift;
    my $data = shift;
    for my $field (@{ $schema->{fields} }) {
        my $key = $field->{name};
        return 0 unless $field->is_data_valid($data->{$key});
    }
    return 1;
}

package Avro::Schema::Field;

sub to_struct {
    my $field = shift;
    my $known_names = shift || {};
    my $type = $field->{type}->to_struct($known_names);
    return { name => $field->{name}, type => $type };
}

sub new {
    my $class = shift;
    my ($struct, $names, $namespace) = @_;

    my $name = $struct->{name};
    throw Avro::Schema::Error::Parse("Record.Field.name is required")
        unless defined $name && length $name;

    my $type = $struct->{type};
    throw Avro::Schema::Error::Parse("Record.Field.name is required")
        unless defined $type && length $type;

    $type = Avro::Schema->parse_struct($type, $names, $namespace);
    my $field = { name => $name, type => $type };
    #TODO: find where to weaken precisely
    #Scalar::Util::weaken($struct->{type});

    if (exists $struct->{default}) {
        my $is_valid = $type->is_data_valid($struct->{default});
        my $t = $type->type;
        throw Avro::Schema::Error::Parse(
            "default value doesn't validate $t: '$struct->{default}'"
        ) unless $is_valid;

        ## small Perlish special case
        if ($type eq 'boolean') {
            $field->{default} = $struct->{default} ? 1 : 0;
        }
        else {
            $field->{default} = $struct->{default};
        }
    }
    if (my $order = $struct->{order}) {
        throw Avro::Schema::Error::Parse(
            "Order '$order' is not valid'"
        ) unless $ValidOrder{$order};
        $field->{order} = $order;
    }
    return bless $field, $class;
}

sub is_data_valid {
    my $field = shift;
    my $data = shift;
    return 1 if $field->{type}->is_data_valid($data);
    return 0;
}

package Avro::Schema::Enum;
our @ISA = qw/Avro::Schema::Named/;

sub new {
    my $class = shift;
    my %param = @_;
    my $schema = $class->SUPER::new(%param);
    my $struct = $param{struct}
        or throw Avro::Schema::Error::Parse("Enum instantiation");
    my $symbols = $struct->{symbols} || [];

    unless (@$symbols) {
        throw Avro::Schema::Error::Parse("Enum needs at least one symbol");
    }
    my %symbols;
    my $pos = 0;
    for (@$symbols) {
        if (ref $_) {
            throw Avro::Schema::Error::Parse(
                "Enum.symbol should be a string"
            );
        }
        throw Avro::Schema::Error::Parse("Duplicate symbol in Enum")
            if exists $symbols{$_};

        $symbols{$_} = $pos++;
    }
    $schema->{hash_symbols} = \%symbols;
    return $schema;
}

sub is_data_valid {
    my $schema = shift;
    my $data = shift;
    return 1 if defined $data && exists $schema->{hash_symbols}{$data};
    return 0;
}

sub symbols {
    my $schema = shift;
    unless (exists $schema->{symbols}) {
        my $sym = $schema->{hash_symbols};
        $schema->{symbols} = [ sort { $sym->{$a} <=> $sym->{$b} } keys %$sym ];
    }
    return $schema->{symbols};
}

sub symbols_as_hash {
    my $schema = shift;
    return $schema->{hash_symbols} || {};
}

sub to_struct {
    my $schema = shift;
    my $known_names = shift || {};

    my $fullname = $schema->fullname;
    if ($known_names->{ $fullname }++) {
        return $fullname;
    }
    return {
        type => 'enum',
        name => $schema->fullname,
        symbols => [ @{ $schema->symbols } ],
    };
}

package Avro::Schema::Array;
our @ISA = qw/Avro::Schema::Base/;

sub new {
    my $class = shift;
    my %param = @_;
    my $schema = $class->SUPER::new(%param);

    my $struct = $param{struct}
        or throw Avro::Schema::Error::Parse("Enum instantiation");

    my $items = $struct->{items}
        or throw Avro::Schema::Error::Parse("Array must declare 'items'");

    $items = Avro::Schema->parse_struct($items, $param{names}, $param{namespace});
    $schema->{items} = $items;
    return $schema;
}

sub is_data_valid {
    my $schema = shift;
    my $default = shift;
    return 1 if $default && ref $default eq 'ARRAY';
    return 0;
}

sub items {
    my $schema = shift;
    return $schema->{items};
}

sub to_struct {
    my $schema = shift;
    my $known_names = shift || {};

    return {
        type => 'array',
        items => $schema->{items}->to_struct($known_names),
    };
}

package Avro::Schema::Map;
our @ISA = qw/Avro::Schema::Base/;

sub new {
    my $class = shift;
    my %param = @_;
    my $schema = $class->SUPER::new(%param);

    my $struct = $param{struct}
        or throw Avro::Schema::Error::Parse("Map instantiation");

    my $values = $struct->{values};
    unless (defined $values && length $values) {
        throw Avro::Schema::Error::Parse("Map must declare 'values'");
    }
    $values = Avro::Schema->parse_struct($values, $param{names}, $param{namespace});
    $schema->{values} = $values;

    return $schema;
}

sub is_data_valid {
    my $schema = shift;
    my $default = shift;
    return 1 if $default && ref $default eq 'HASH';
    return 0;
}

sub values {
    my $schema = shift;
    return $schema->{values};
}

sub to_struct {
    my $schema = shift;
    my $known_names = shift || {};

    return {
        type => 'map',
        values => $schema->{values}->to_struct($known_names),
    };
}

package Avro::Schema::Union;
our @ISA = qw/Avro::Schema::Base/;

sub new {
    my $class = shift;
    my %param = @_;
    my $schema = $class->SUPER::new(%param);
    my $union = $param{struct}
        or throw Avro::Schema::Error::Parse("Union.new needs a struct");

    my $names = $param{names} ||= {};

    my @schemas;
    my %seen_types;
    for my $struct (@$union) {
        my $sch = Avro::Schema->parse_struct($struct, $names, $param{namespace});
        my $type = $sch->type;

        ## 1.3.2 Unions may not contain more than one schema with the same
        ## type, except for the named types record, fixed and enum. For
        ## example, unions containing two array types or two map types are not
        ## permitted, but two types with different names are permitted.
        if (Avro::Schema::Named->is_type_valid($type)) {
            $type = $sch->fullname; # resolve Named types to their name
        }
        ## XXX: I could define &type_name doing the correct resolution for all classes
        if ($seen_types{ $type }++) {
            throw Avro::Schema::Error::Parse(
                "$type is present more than once in the union"
            )
        }
        ## 1.3.2 Unions may not immediately contain other unions.
        if ($type eq 'union') {
            throw Avro::Schema::Error::Parse(
                "Cannot embed unions in union"
            );
        }
        push @schemas, $sch;
    }
    $schema->{schemas} = \@schemas;

    return $schema;
}

sub schemas {
    my $schema = shift;
    return $schema->{schemas};
}

sub is_data_valid {    
    my $schema = shift;
    my $data = shift;
    for my $type ( @{ $schema->{schemas} } ) {
        if ( $type->is_data_valid($data) ) {
            return 1;
        }
    }
    return 0;
}

sub to_struct {
    my $schema = shift;
    my $known_names = shift || {};
    return [ map { $_->to_struct($known_names) } @{$schema->{schemas}} ];
}

package Avro::Schema::Fixed;
our @ISA = qw/Avro::Schema::Named/;

sub new {
    my $class = shift;
    my %param = @_;
    my $schema = $class->SUPER::new(%param);

    my $struct = $param{struct}
        or throw Avro::Schema::Error::Parse("Fixed instantiation");

    my $size = $struct->{size};
    unless (defined $size && length $size) {
        throw Avro::Schema::Error::Parse("Fixed must declare 'size'");
    }
    if (ref $size) {
        throw Avro::Schema::Error::Parse(
            "Fixed.size should be a scalar"
        );
    }
    unless ($size =~ m{^\d+$} && $size > 0) {
        throw Avro::Schema::Error::Parse(
            "Fixed.size should be a positive integer"
        );
    }
    # Cast into numeric so that it will be encoded as a JSON number
    $schema->{size} = $size + 0;

    return $schema;
}

sub is_data_valid {
    my ( $schema, $data ) = @_;

    return 0 if utf8::is_utf8($data) && $data =~ /[^\x00-\xFF]/;
    return $data && length($data) == $schema->{size};
}

sub size {
    my $schema = shift;
    return $schema->{size};
}

sub to_struct {
    my $schema = shift;
    my $known_names = shift || {};

    my $fullname = $schema->fullname;
    if ($known_names->{ $fullname }++) {
        return $fullname;
    }

    return {
        type => 'fixed',
        name => $fullname,
        size => $schema->{size},
    };
}

package Avro::Schema::Error::Parse;
use parent 'Error::Simple';

package Avro::Schema::Error::Name;
use parent 'Error::Simple';

package Avro::Schema::Error::Mismatch;
use parent 'Error::Simple';

1;
