src/include/catalog/li_extras.py (231 lines of code) (raw):
#! /usr/local/bin/python
import sys
import datetime
import pdb
#pdb.set_trace()
# Test and declaration generator for linear_interpolation
# Utility
def kwot(s):
"""Single quote a string, doubling contained quotes as needed."""
return "'" + "''".join(s.split("'")) + "'"
# Following section is just fragile helper functions to produce triples
# of values (v0, v, v1) such that, for any choice of scale and transform
# (scale applied first, transform last):
#
# (v-v0)/(v1-v) = p1/p2
#
# The idea is that, by keeping p1 and p2 uniform in a test set, we can
# know the result of linear interpolation independently of the actual
# function linear_interpolate and include it in the regressions to make
# it easy to identify issues.
def int_triple(p1, p2, scale, transform):
return tuple( [x*scale + transform for x in [0, p1, p1+p2]] )
def num_triple(p1, p2, scale, transform):
return tuple( [str(x) for x in int_triple(p1, p2, scale, transform)] )
def date_triple(p1, p2, scale, transform, basedate):
i = int_triple(p1, p2, scale, transform)
g = basedate.toordinal()
d = [datetime.date.fromordinal(x + g) for x in i]
return tuple( [kwot(x.isoformat()) for x in d] )
def time_offset_by_minutes(t, d):
x = t.hour * 60 + t.minute + d
h = x / 60
m = x - 60 * h
h = h - 24 * (h / 24)
return datetime.time(hour = h, minute = m)
def time_triple(p1, p2, scale, transform, basetime):
i = int_triple(p1, p2, scale, transform)
t = [ time_offset_by_minutes(basetime, x) for x in i]
return tuple( [kwot(x.isoformat()) for x in t] )
def datetime_triple(p1, p2, scale, transform, basestamp):
i = int_triple(p1, p2, scale, transform)
d = [ datetime.timedelta(minutes=x) for x in i ]
return [ kwot( (basestamp + x).isoformat() ) for x in d]
def interval_triple(p1, p2, scale, transform, baseminutes):
i = int_triple(p1, p2, scale, transform+baseminutes)
return tuple( [ kwot(str(x) + ' minutes') for x in i ] )
# The following table drives tests and declarations per data type.
# The keys are type SQL type names that are known to cattulus.pl.
# The values are tuples of
# - a 3-tuple of values in SQL form (no casts): low, middle, high.
# - a 3-tuple of proportionally spread values like the first.
# - an appropriate C type declarator (beware Interval's pointer-ness).
# - position as in index into array of types.
#
# The position value in important to ensure OID consistency.
#
# The delta values fix the proportions of the 3-tuple values.
delta1 = 1
delta2 = 4
type_dict = {
'int8' : (
num_triple(delta1, delta2, 100, 100),
num_triple(delta1, delta2, 50, 2000),
'int64',
0),
'int4' : (
num_triple(delta1, delta2, 100, 100),
num_triple(delta1, delta2, 50, 2000),
'int32',
1),
'int2' : (
num_triple(delta1, delta2, 100, 100),
num_triple(delta1, delta2, 50, 2000),
'int2',
2),
'float8' : (
num_triple(delta1, delta2, 100, 100),
num_triple(delta1, delta2, 50, 2000),
'float8',
3),
'float4' : (
num_triple(delta1, delta2, 100, 100),
num_triple(delta1, delta2, 50, 2000),
'float4',
4),
'date' : (
date_triple(delta1, delta2, 10, 10, datetime.date(2001, 1, 1)),
date_triple(delta1, delta2, 10, 20, datetime.date(2010, 1, 1)),
'DateADT',
5),
'time' : (
time_triple(delta1, delta2, 5, 20, datetime.time(hour=10)),
time_triple(delta1, delta2, 10, 300, datetime.time(hour=10)),
'TimeADT',
6),
'timestamp' : (
datetime_triple(delta1, delta2, 1000, 2000, datetime.datetime(2010, 1, 1)),
datetime_triple(delta1, delta2, 5000, 1000, datetime.datetime(2012, 6, 1)),
'Timestamp',
7),
'timestamptz' : (
datetime_triple(delta1, delta2, 1000, 2000, datetime.datetime(2010, 1, 1)),
datetime_triple(delta1, delta2, 5000, 1000, datetime.datetime(2012, 6, 1)),
'TimestampTz',
8),
'interval' : (
interval_triple(delta1, delta2, 20, 10, 55),
interval_triple(delta1, delta2, 10, 20, 30),
'Interval',
9),
'numeric' : (
num_triple(delta1, delta2, 100, 100),
num_triple(delta1, delta2, 50, 2000),
'Numeric',
10),
}
# For OID assignment we choose to order the types and assign ascending
# OID values starting from a base. The caller is responsible for setting
# base and maintaining the order of assignment.
oid_base = 6072
def ordered_types():
"""List of types in canonical order of OID assignment."""
keys = type_dict.keys()
ordered_keys = [None for k in keys]
for key in keys:
pos = type_dict[key][3]
ordered_keys[pos] = key
assert not None in ordered_keys
return ordered_keys
# Convenient wrapper for one of the table's 3-tuples.
class Coordinate(object):
def __init__(self, ctype, lmhtuple):
self._type = ctype
(self._low, self._mid, self._high) = lmhtuple
def low(self):
return self._low + '::' + self._type
def mid(self):
return self._mid + '::' + self._type
def high(self):
return self._high + '::' + self._type
def linear_interpolate(x, y, x0, y0, x1, y1):
"""Format a call to linear_interpolate for the given arguments and expected result.
"""
fmt = """
select
linear_interpolate({x}, {x0}, {y0}, {x1}, {y1}),
{y} as answer,
{y} = linear_interpolate({x}, {x0}, {y0}, {x1}, {y1}) as match ;"""
return fmt.format(x=x, y=y, x0=x0, y0=y0, x1=x1, y1=y1)
def preliminary_tests(verbose):
""" Preliminary tests (SQL), optionally verbose, as '\n'-delimited string.
"""
prelim = """
-- A "generic" unsupported type.
select linear_interpolate('x'::text, 'x0'::text, 0, 'x1'::text, 1000);
select linear_interpolate(5, 0, 'y0'::text, 100, 'y1'::text);
-- Check that "divide by zero" returns null"""
prelim = prelim.split('\n')
fmt = """select linear_interpolate({x}, {x0}, {y0}, {x1}, {y1});"""
for t in type_dict:
a = Coordinate(t, type_dict[t][0])
o = Coordinate('int4', type_dict['int4'][1])
s = fmt.format(x=a.mid(), x0=a.low(), y0=o.low(), x1=a.low(), y1=o.high())
prelim = prelim + s.split('\n')
return '\n'.join(prelim)
def basic_tests(abscissa_type, ordinate_type, verbose):
"""Per-type tests (SQL), optionally verbose, as '\n'-delimited string.
"""
abscissa = Coordinate(abscissa_type, type_dict[abscissa_type][0])
ordinate = Coordinate(ordinate_type, type_dict[ordinate_type][1])
if verbose:
lst = [
'',
'\qecho',
'\qecho Check interpolation correctness: %s --> %s' % (abscissa_type, ordinate_type),
]
prolog = []
else:
lst = []
prolog = [
'',
'',
'-- Abscissa: %s, Ordinate: %s' % (abscissa_type, ordinate_type),
'-- Check correctness - all results should have match = t'
]
# Use the triples in all combinations to test outlying cases as well as "sweet
# spot" cases.
tst = linear_interpolate(
abscissa.mid(), ordinate.mid(),
abscissa.low(), ordinate.low(),
abscissa.high(), ordinate.high() )
lst = lst + tst.split('\n')
tst = linear_interpolate(
abscissa.mid(), ordinate.mid(),
abscissa.high(), ordinate.high(),
abscissa.low(), ordinate.low() )
lst = lst + tst.split('\n')
tst = linear_interpolate(
abscissa.low(), ordinate.low(),
abscissa.mid(), ordinate.mid(),
abscissa.high(), ordinate.high() )
lst = lst + tst.split('\n')
tst = linear_interpolate(
abscissa.low(), ordinate.low(),
abscissa.high(), ordinate.high(),
abscissa.mid(), ordinate.mid() )
lst = lst + tst.split('\n')
tst = linear_interpolate(
abscissa.high(), ordinate.high(),
abscissa.mid(), ordinate.mid(),
abscissa.low(), ordinate.low() )
lst = lst + tst.split('\n')
tst = linear_interpolate(
abscissa.high(), ordinate.high(),
abscissa.low(), ordinate.low(),
abscissa.mid(), ordinate.mid() )
lst = lst + tst.split('\n')
# Include one trivial case
tst = linear_interpolate(
abscissa.mid(), ordinate.mid(),
abscissa.mid(), ordinate.mid(),
abscissa.mid(), ordinate.mid() )
lst = lst + tst.split('\n')
return '\n'.join(prolog + lst)
def all_tests(verbose):
result = preliminary_tests(verbose)
for abscissa_type in type_dict:
result = result + basic_tests(abscissa_type, 'int4', verbose)
for ordinate_type in type_dict:
result = result + basic_tests('int4', ordinate_type, verbose)
return result
def regression_tests():
return all_tests(False)
def readable_tests():
return all_tests(True)
declared_description = 'linear interpolation: x, x0,y0, x1,y1'
def pg_proc_declarations():
template = """
CREATE FUNCTION linear_interpolate(
anyelement,
anyelement,
{T},
anyelement,
{T}
)
RETURNS {T}
LANGUAGE internal IMMUTABLE STRICT
AS 'linterp_{t}'
WITH (OID={oid}, DESCRIPTION="{desc}");"""
result = ['-- for cdb_pg/src/include/catalog/pg_proc.sql', '']
fmt = ' '.join([x.strip() for x in template.split('\n')])
next_oid = oid_base
all_types = ordered_types()
for sql_type in all_types:
c_type = type_dict[sql_type][2]
result = result + [fmt.format(T=sql_type, t=c_type, oid=str(next_oid), desc=declared_description)]
next_oid = next_oid + 1
return '\n'.join(result)
def upgrade_declarations():
upgrade_1 = """
CREATE FUNCTION @gpupgradeschemaname@.linear_interpolate(
anyelement,
anyelement,
{T},
anyelement,
{T}
)
RETURNS {T}
LANGUAGE internal IMMUTABLE STRICT
AS 'linterp_{t}'
WITH (OID={oid}, DESCRIPTION="{desc}");"""
upgrade_2 = """
COMMENT ON FUNCTION @gpupgradeschemaname@.linear_interpolate(
anyelement,
anyelement,
{T},
anyelement,
{T}
)
IS '{desc}';"""
result = ['-- for src/test/regress/data/upgradeXX/upg2_catupgrade_XXX.sql.in', '']
upg_1 = ' '.join([x.strip() for x in upgrade_1.split('\n')])
upg_2 = ' '.join([x.strip() for x in upgrade_2.split('\n')])
next_oid = oid_base
all_types = ordered_types()
for sql_type in all_types:
c_type = type_dict[sql_type][2]
result.append( upg_1.format(T=sql_type, t=c_type, oid=str(next_oid), desc=declared_description) )
result.append( upg_2.format(T=sql_type, t=c_type, oid=str(next_oid), desc=declared_description) )
result.append( '' )
next_oid = next_oid + 1
return '\n'.join(result)
#
# Interpret the arguments, write result to standard out.
def main():
argmap = {
'readable' : readable_tests,
'regression' : regression_tests,
'pg_proc' : pg_proc_declarations,
'upgrade' : upgrade_declarations
}
efmt = 'argument must be one of (%s), not "%s"' % (', '.join(argmap.keys()), "%s")
if len(sys.argv) == 1:
fn = argmap['readable']
elif len(sys.argv) == 2 and sys.argv[1] in argmap:
fn = argmap[sys.argv[1]]
else:
sys.exit(efmt % ' '.join(sys.argv[1:]))
print fn()
if __name__ == '__main__':
main()