in gpcontrib/gpmapreduce/src/mapred.c [315:1019]
void lookup_function_in_catalog(PGconn *conn, mapred_document_t *doc,
mapred_object_t *obj)
{
PGresult *result = NULL;
PGresult *result2 = NULL;
mapred_plist_t *plist, *plist2;
mapred_plist_t *newitem = NULL;
mapred_plist_t *returns = NULL;
buffer_t *buffer = NULL;
char *tmp1 = NULL;
char *tmp2 = NULL;
char *tmp3 = NULL;
#define STR_LEN 50
char str[STR_LEN];
int i, nargs;
XASSERT(doc);
XASSERT(obj);
XASSERT(obj->kind == MAPRED_MAPPER ||
obj->kind == MAPRED_TRANSITION ||
obj->kind == MAPRED_COMBINER ||
obj->kind == MAPRED_FINALIZER);
obj->internal = true;
obj->u.function.internal_returns = NULL;
XTRY
{
buffer = makebuffer(1024, 1024);
/* Try to lookup the specified function */
bufcat(&buffer,
"SELECT proretset, prorettype::regtype, pronargs,\n"
" proargnames, proargmodes, \n"
" (proargtypes::regtype[])[0:pronargs] as proargtypes,\n"
" proallargtypes::regtype[],\n");
/*
* If we have return types defined in the yaml then we want to resolve
* them to their authorative names for comparison purposes.
*/
if (obj->u.function.returns)
{
bufcat(&buffer, " ARRAY[");
for (plist = obj->u.function.returns; plist; plist = plist->next)
{
if (plist->type)
{
bufcat(&buffer, "'");
bufcat(&buffer, plist->type);
bufcat(&buffer, "'::regtype");
}
else
{
/* If we don't know the type, punt */
bufcat(&buffer, "'-'::regtype");
}
if (plist->next)
bufcat(&buffer, ", ");
}
bufcat(&buffer, "] as yaml_rettypes\n");
}
else
{
bufcat(&buffer, " null::regtype[] as yaml_rettypes\n");
}
bufcat(&buffer,
"FROM pg_proc\n"
"WHERE prokind = 'f'\n"
" AND proname = lower('");
bufcat(&buffer, obj->name);
bufcat(&buffer, "')\n");
/* Fill in the known parameter types */
nargs = 0;
if (obj->u.function.parameters)
{
bufcat(&buffer, " AND (proargtypes::regtype[])[0:pronargs] = ARRAY[");
for (plist = obj->u.function.parameters; plist; plist = plist->next)
{
nargs++;
bufcat(&buffer, "'");
bufcat(&buffer, plist->type);
bufcat(&buffer, "'::regtype");
if (plist->next)
bufcat(&buffer, ", ");
}
snprintf(str, STR_LEN, "]\n AND pronargs=%d\n", nargs);
bufcat(&buffer, str);
}
/* Run the SQL */
if (global_print_flag || global_debug_flag)
printf("%s", buffer->buffer);
result = PQexec(conn, buffer->buffer);
bufreset(buffer);
if (PQresultStatus(result) != PGRES_TUPLES_OK)
{
/*
* The SQL statement failed:
* Most likely scenario is a bad datatype causing the regtype cast
* to fail.
*/
char *code = PQresultErrorField(result, PG_DIAG_SQLSTATE);
char *error = PQresultErrorMessage(result);
printf("errcode=\"%s\"\n", code); /* Todo: validate expected error code */
mapred_obj_error(obj, "SQL Error resolving function: \n %s",
error);
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
else if (PQntuples(result) == 0)
{
/* No such function */
mapred_obj_error(obj, "No such function");
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
else if (PQntuples(result) > 1)
{
XASSERT(!obj->u.function.parameters);
mapred_obj_error(obj, "Ambiguous function, supply a function "
"prototype for disambiguation");
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
else
{
char *value;
int len;
boolean retset;
int nargs;
char *argtypes = NULL;
char *argnames = "";
char *argmodes = "";
char *allargtypes = "";
char *rettype = NULL;
char *yaml_rettypes = "";
char *type, *typetokens = NULL;
char *name, *nametokens = NULL;
char *mode, *modetokens = NULL;
boolean name_end, mode_end;
value = PQgetvalue(result, 0, 0); /* Column 0: proretset */
retset = (value[0] == 't');
value = PQgetvalue(result, 0, 1); /* Column 1: prorettype */
rettype = value;
value = PQgetvalue(result, 0, 2); /* Column 2: pronargs */
nargs = (int) strtol(value, (char **) NULL, 10);
/*
* Arrays are formatted as: "{value,value,...}"
* of which we only want "value,value, ..."
* so find the part of the string between the braces
*/
if (!PQgetisnull(result, 0, 3)) /* Column 3: proargnames */
{
value = PQgetvalue(result, 0, 3);
argnames = value+1;
len = strlen(argnames);
if (len > 0)
argnames[len-1] = '\0';
}
if (!PQgetisnull(result, 0, 4)) /* Column 4: proargmodes */
{
value = PQgetvalue(result, 0, 4);
argmodes = value+1;
len = strlen(argmodes);
if (len > 0)
argmodes[len-1] = '\0';
}
if (!PQgetisnull(result, 0, 5)) /* Column 5: proargtypes */
{
value = PQgetvalue(result, 0, 5);
argtypes = value+1;
len = strlen(argtypes);
if (len > 0)
argtypes[len-1] = '\0';
}
if (!PQgetisnull(result, 0, 6)) /* Column 6: proallargtypes */
{
value = PQgetvalue(result, 0, 6);
allargtypes = value+1;
len = strlen(allargtypes);
if (len > 0)
allargtypes[len-1] = '\0';
}
if (!PQgetisnull(result, 0, 7)) /* Column 7: yaml_rettypes */
{
value = PQgetvalue(result, 0, 7);
yaml_rettypes = value+1;
len = strlen(yaml_rettypes);
if (len > 0)
yaml_rettypes[len-1] = '\0';
}
/*
* These constraints should all be enforced in the catalog, so
* if something is wrong then it's a coding error above.
*/
XASSERT(rettype);
XASSERT(argtypes);
XASSERT(nargs >= 0);
/*
* If we just derived the parameters from the catalog then we
* need complete our internal metadata.
*/
plist = NULL;
if (!obj->u.function.parameters)
{
/* strtok is destructive and we need to preserve the original
* string, so we make some annoying copies prior to strtok.
*/
tmp1 = copyscalar(argtypes);
tmp2 = copyscalar(argnames);
tmp3 = copyscalar(argmodes);
type = strtok_r(tmp1, ",", &typetokens);
name = strtok_r(tmp2, ",", &nametokens);
mode = strtok_r(tmp3, ",", &modetokens);
/*
* Name and mode are used for IN/OUT parameters and may not be
* present. In the event that they are we are looking for:
* - the "i" (in) arguments
* - the "b" (inout) arguments
* we skip over:
* - the "o" (out) arguments.
* - the "t" (table out) arguments.
*
* Further it is possible for some of the arguments to be named
* and others to be unnamed. The unnamed arguments will show
* up as "" (two quotes, not an empty string) if there is an
* argnames defined.
*
* If argmodes is not defined then all names in proargnames
* refer to input arguments.
*/
while (mode && strcmp(mode, "i") && strcmp(mode, "b"))
{
name = strtok_r(NULL, ",", &nametokens);
mode = strtok_r(NULL, ",", &modetokens);
}
name_end = (NULL == name);
mode_end = (NULL == mode);
i = 0;
while (type)
{
/* Keep track of which parameter we are on */
i++;
XASSERT(i <= nargs);
/*
* If a name was not specified by the user, and was not
* specified by the in/out parameters then we assign it a
* default name.
*/
if (!name)
{
/* single argument functions always default to "value" */
if (i == 1 && nargs == 1)
name = (char*) "value";
/* Base name on default parameter names for the first
* two arguments */
else if (i <= 2)
name = (char*) default_parameter_names[obj->kind][i-1];
/*
* If we still didn't decide on a name, make up
* something useless.
*/
if (!name)
{
snprintf(str, STR_LEN, "parameter%d", i);
name = str;
}
}
if (!plist)
{
plist = mapred_malloc(sizeof(mapred_plist_t));
plist->name = copyscalar(name);
plist->type = copyscalar(type);
plist->next = (mapred_plist_t *) NULL;
obj->u.function.parameters = plist;
}
else
{
plist->next = mapred_malloc(sizeof(mapred_plist_t));
plist = plist->next;
plist->name = copyscalar(name);
plist->type = copyscalar(type);
plist->next = (mapred_plist_t *) NULL;
}
/* Procede to the next parameter */
type = strtok_r(NULL, ",", &typetokens);
if (!name_end)
{
name = strtok_r(NULL, ",", &nametokens);
name_end = (NULL == name);
}
if (!mode_end)
{
mode = strtok_r(NULL, ",", &modetokens);
mode_end = (NULL == mode);
}
while (mode && strcmp(mode, "i") && strcmp(mode, "b"))
{
if (!name_end)
{
name = strtok_r(NULL, ",", &nametokens);
name_end = (NULL == name);
}
if (!mode_end)
{
mode = strtok_r(NULL, ",", &modetokens);
mode_end = (NULL == mode);
}
}
}
mapred_free(tmp1);
mapred_free(tmp2);
mapred_free(tmp3);
tmp1 = NULL;
tmp2 = NULL;
tmp3 = NULL;
}
/*
* Check that the number of parameters received is appropriate.
* This would be better moved to a generalized validation routine.
*/
switch (obj->kind)
{
case MAPRED_MAPPER:
/*
* It would probably be possible to start supporting zero
* argument mappers, but:
* 1) It would require more modifications
* 2) Doesn't currently have a known use case
* 3) Has easy workarounds
*/
if (nargs < 1)
{
mapred_obj_error(obj, "Transition functions require "
"two or more parameters");
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
break;
case MAPRED_TRANSITION:
if (nargs < 2)
{
mapred_obj_error(obj, "Transition functions require "
"two or more parameters");
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
if (retset)
{
mapred_obj_error(obj, "Transition functions cannot "
"be table functions");
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
break;
case MAPRED_COMBINER:
if (nargs != 2)
{
mapred_obj_error(obj, "Consolidate functions require "
"exactly two parameters");
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
if (retset)
{
mapred_obj_error(obj, "Consolidate functions cannot "
"be table functions");
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
break;
case MAPRED_FINALIZER:
if (nargs != 1)
{
mapred_obj_error(obj, "Finalize functions require "
"exactly one parameter");
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
break;
default:
XASSERT(false);
}
/* Fill in return type information */
if (retset)
obj->u.function.mode = MAPRED_MODE_MULTI;
else
obj->u.function.mode = MAPRED_MODE_SINGLE;
/*
* Determine the return type information, there are 3 primary
* subcases:
*
* 1) Function is defined with OUT/TABLE parameters.
* 2) Function returns a simple type.
* 3) Function returns a complex type.
* 4) Return type is void [error]
*/
plist = returns = NULL;
if (argmodes && strlen(argmodes) > 0)
{
/* strtok is destructive and we need to preserve the original
* string, so we make some annoying copies prior to strtok.
*/
tmp1 = copyscalar(allargtypes);
tmp2 = copyscalar(argnames);
tmp3 = copyscalar(argmodes);
type = strtok_r(tmp1, ",", &typetokens);
name = strtok_r(tmp2, ",", &nametokens);
mode = strtok_r(tmp3, ",", &modetokens);
i = 1;
while (mode)
{
while (mode &&
strcmp(mode, "o") &&
strcmp(mode, "b") &&
strcmp(mode, "t"))
{
/* skip input parameters */
type = strtok_r(NULL, ",", &typetokens);
name = strtok_r(NULL, ",", &nametokens);
mode = strtok_r(NULL, ",", &modetokens);
}
if (mode)
{
XASSERT(type);
newitem = mapred_malloc(sizeof(mapred_plist_t));
/*
* Note we haven't made local copies of these, we will
* do this after resolution when validating against any
* RETURNS defined in the yaml, if any.
*/
if( NULL != name &&
0 != strcmp(name, "") &&
0 != strcmp(name, "\"\"") )
{
/*if name defined in db, just use it*/
newitem->name = copyscalar(name);
}
else
{
/*else just obey the default name in db*/
snprintf( str, STR_LEN, "column%d", i);
newitem->name = copyscalar(str);
}
newitem->type = copyscalar(type);
newitem->next = NULL;
if (plist)
plist->next = newitem;
else
returns = newitem;
plist = newitem;
++i;
}
type = strtok_r(NULL, ",", &typetokens);
name = strtok_r(NULL, ",", &nametokens);
mode = strtok_r(NULL, ",", &modetokens);
}
mapred_free(tmp1);
mapred_free(tmp2);
mapred_free(tmp3);
tmp1 = NULL;
tmp2 = NULL;
tmp3 = NULL;
}
/*
* If the arguments were not defined in the function definition then
* we check to see if this was a complex type by looking up the type
* information in pg_attribute.
*/
if (!returns)
{
bufcat(&buffer,
"SELECT attname, atttypid::regtype\n"
"FROM pg_attribute a\n"
"JOIN pg_class c on (a.attrelid = c.oid)\n"
"WHERE not a.attisdropped\n"
" AND a.attnum > 0\n"
" AND c.reltype = '");
bufcat(&buffer, rettype);
bufcat(&buffer,
"'::regtype\n"
"ORDER BY -attnum");
result2 = PQexec(conn, buffer->buffer);
bufreset(buffer);
if (PQresultStatus(result2) != PGRES_TUPLES_OK)
{
char *error = PQresultErrorMessage(result);
mapred_obj_error(obj, "Error resolving function: %s", error);
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
else if (PQntuples(result2) > 0)
{
/* We have a complex type, build the return list */
for (i = 0; i < PQntuples(result2); i++)
{
name = PQgetvalue(result2, i, 0);
type = PQgetvalue(result2, i, 1);
newitem = mapred_malloc(sizeof(mapred_plist_t));
newitem->name = copyscalar(name);
newitem->type = copyscalar(type);
newitem->next = returns;
returns = newitem;
}
}
}
/*
* If the return types were not defined in either the argument list
* nor the catalog then we assume it is a simple type.
*/
if (!returns)
{
/* Check against "void" which is a special return type that
* means there is no return value - which we don't support for
* mapreduce.
*/
if (!strcmp(rettype, "void"))
{
mapred_obj_error(obj, "Function returns void");
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
returns = mapred_malloc(sizeof(mapred_plist_t));
returns->type = copyscalar(rettype);
returns->name = NULL;
returns->next = NULL;
}
/*
* We now should have a returns list, compare it against the RETURNS
* list given in the yaml. The yaml overrides return names, but can
* not override return types. If the return types are incompatible
* raise an error.
*/
obj->u.function.internal_returns = returns;
if (obj->u.function.returns)
{
/*
* The first thing to do is normalize the given return types
* with their formal names. This will, for example turn a type
* like "float8" => "double precision". The input name might
* be correct (float8) but we need it represented as the formal
* name so that we can compare against the formal name we got
* when we looked up the function in the catalog.
*/
plist = obj->u.function.returns;
type = strtok_r(yaml_rettypes, ",", &typetokens);
while (plist)
{
XASSERT(type); /* should be an equal number */
/*
* If we have a type specified replace it with the one we
* resolved from the select stmt, otherwise just keep it
* as NULL and fill it in during the compare against what
* was in the catalog.
*/
if (plist->type)
{
mapred_free(plist->type);
/*
* When in an array the typname may get wrapped in
* double quotes, if so we need to strip them back out.
*/
if (type[0] == '"')
{
plist->type = copyscalar(type+1);
plist->type[strlen(plist->type)-1] = '\0';
}
else
{
plist->type = copyscalar(type);
}
}
plist = plist->next;
type = strtok_r(NULL, ",", &typetokens);
}
/* Compare against actual function return types */
plist = obj->u.function.returns;
plist2 = returns;
while (plist && plist2)
{
XASSERT(plist->name); /* always defined in YAML */
XASSERT(plist2->type); /* always defined in SQL */
/*
* In the YAML it is possible to have a name without a type,
* if that is the case then simply take the SQL type.
*/
if (!plist->type)
plist->type = copyscalar(plist2->type);
else if (strcmp(plist->type, plist2->type))
break;
plist = plist->next;
plist2 = plist2->next;
}
if (plist || plist2)
{
mapred_obj_error(obj, "RETURN parameter '%s %s' != '%s %s'",
plist ? plist->name : "\"\"",
plist ? plist->type : "-",
plist2 ? (plist2->name ? plist2->name : plist->name) : "\"\"",
plist2 ? plist2->type : "-");
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
}
else
{
obj->u.function.returns = returns;
i = 0;
for (plist = returns; plist; plist = plist->next)
{
XASSERT(plist->type);
i++;
plist->type = copyscalar(plist->type);
/*
* if plist->name is not null and empty string,
* then use that name
*/
if (plist->name &&
0 != strcmp(plist->name, "") &&
0 != strcmp(plist->name, "\"\"") )
{
plist->name = copyscalar(plist->name);
}
/*
* else We need generate a name anyway
*/
else
{
/*
* Manufacture a name for a column based on default
* naming rules.
*/
name = (char*) NULL;
if (i <= 2)
name = (char*) default_return_names[obj->kind][i-1];
if (!name)
{
snprintf(str, STR_LEN, "parameter%d", i);
name = str;
}
plist->name = copyscalar(name);
}
}
}
}
}
XFINALLY
{
if (result)
PQclear(result);
if (result2)
PQclear(result2);
if (buffer)
mapred_free(buffer);
if (tmp1)
mapred_free(tmp1);
if (tmp2)
mapred_free(tmp2);
if (tmp3)
mapred_free(tmp3);
}
XTRY_END;
}