void lookup_function_in_catalog()

in gpcontrib/gpmapreduce/src/mapred.c [315:1019]


void lookup_function_in_catalog(PGconn *conn, mapred_document_t *doc,
								mapred_object_t *obj)
{
	PGresult			*result	 = NULL;
	PGresult			*result2 = NULL;
	mapred_plist_t		*plist, *plist2;
	mapred_plist_t		*newitem = NULL;
	mapred_plist_t		*returns = NULL;
	buffer_t			*buffer	 = NULL;
	char				*tmp1	 = NULL;
	char				*tmp2	 = NULL;
	char				*tmp3	 = NULL;
#define STR_LEN 50
	char				str[STR_LEN];
	int					i, nargs;

	XASSERT(doc);
	XASSERT(obj);
	XASSERT(obj->kind == MAPRED_MAPPER     ||
			obj->kind == MAPRED_TRANSITION ||
			obj->kind == MAPRED_COMBINER   ||
			obj->kind == MAPRED_FINALIZER);

	obj->internal = true;
	obj->u.function.internal_returns = NULL;

	XTRY
	{
		buffer = makebuffer(1024, 1024);

		/* Try to lookup the specified function */
		bufcat(&buffer,
			   "SELECT proretset, prorettype::regtype, pronargs,\n"
			   "       proargnames, proargmodes, \n"
			   "       (proargtypes::regtype[])[0:pronargs] as proargtypes,\n"
			   "       proallargtypes::regtype[],\n");

		/*
		 * If we have return types defined in the yaml then we want to resolve
		 * them to their authorative names for comparison purposes.
		 */
		if (obj->u.function.returns)
		{
			bufcat(&buffer, "       ARRAY[");
			for (plist = obj->u.function.returns; plist; plist = plist->next)
			{
				if (plist->type)
				{
					bufcat(&buffer, "'");
					bufcat(&buffer, plist->type);
					bufcat(&buffer, "'::regtype");
				}
				else
				{
					/* If we don't know the type, punt */
					bufcat(&buffer, "'-'::regtype");
				}
				if (plist->next)
					bufcat(&buffer, ", ");
			}
			bufcat(&buffer, "] as yaml_rettypes\n");
		}
		else
		{
			bufcat(&buffer, "       null::regtype[] as yaml_rettypes\n");
		}

		bufcat(&buffer,
			   "FROM   pg_proc\n"
			   "WHERE  prokind = 'f'\n"
			   "  AND  proname = lower('");
		bufcat(&buffer, obj->name);
		bufcat(&buffer, "')\n");

		/* Fill in the known parameter types */
		nargs = 0;
		if (obj->u.function.parameters)
		{
			bufcat(&buffer, "  AND  (proargtypes::regtype[])[0:pronargs] = ARRAY[");
			for (plist = obj->u.function.parameters; plist; plist = plist->next)
			{
				nargs++;
				bufcat(&buffer, "'");
				bufcat(&buffer, plist->type);
				bufcat(&buffer, "'::regtype");
				if (plist->next)
					bufcat(&buffer, ", ");
			}
			snprintf(str, STR_LEN, "]\n  AND pronargs=%d\n", nargs);
			bufcat(&buffer, str);
		}

		/* Run the SQL */
		if (global_print_flag || global_debug_flag)
			printf("%s", buffer->buffer);
		result = PQexec(conn, buffer->buffer);
		bufreset(buffer);

		if (PQresultStatus(result) != PGRES_TUPLES_OK)
		{
			/*
			 * The SQL statement failed:
			 * Most likely scenario is a bad datatype causing the regtype cast
			 * to fail.
			 */
			char *code  = PQresultErrorField(result, PG_DIAG_SQLSTATE);
			char *error = PQresultErrorMessage(result);

			printf("errcode=\"%s\"\n", code);  /* Todo: validate expected error code */

			mapred_obj_error(obj, "SQL Error resolving function: \n  %s",
							 error);
			XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
		}
		else if (PQntuples(result) == 0)
		{
			/* No such function */
			mapred_obj_error(obj, "No such function");
			XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
		}
		else if (PQntuples(result) > 1)
		{
			XASSERT(!obj->u.function.parameters);
			mapred_obj_error(obj, "Ambiguous function, supply a function "
							 "prototype for disambiguation");
			XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
		}
		else
		{
			char		*value;
			int          len;
			boolean		 retset;
			int          nargs;
			char		*argtypes	 = NULL;
			char		*argnames	 = "";
			char		*argmodes	 = "";
			char		*allargtypes = "";
			char		*rettype	 = NULL;
			char        *yaml_rettypes = "";
			char        *type, *typetokens = NULL;
			char        *name, *nametokens = NULL;
			char        *mode, *modetokens = NULL;
			boolean     name_end, mode_end;

			value = PQgetvalue(result, 0, 0);   /* Column 0: proretset */
			retset = (value[0] == 't');
			value = PQgetvalue(result, 0, 1);   /* Column 1: prorettype */
			rettype = value;
			value = PQgetvalue(result, 0, 2);   /* Column 2: pronargs */
			nargs = (int) strtol(value, (char **) NULL, 10);

			/*
			 * Arrays are formatted as:  "{value,value,...}"
			 * of which we only want "value,value, ..."
			 * so find the part of the string between the braces
			 */
			if (!PQgetisnull(result, 0, 3))   /* Column 3: proargnames */
			{
				value = PQgetvalue(result, 0, 3);
				argnames = value+1;
				len = strlen(argnames);
				if (len > 0)
					argnames[len-1] = '\0';
			}

			if (!PQgetisnull(result, 0, 4))   /* Column 4: proargmodes */
			{
				value = PQgetvalue(result, 0, 4);
				argmodes = value+1;
				len = strlen(argmodes);
				if (len > 0)
					argmodes[len-1] = '\0';
			}
			if (!PQgetisnull(result, 0, 5))   /* Column 5: proargtypes */
			{
				value = PQgetvalue(result, 0, 5);
				argtypes = value+1;
				len = strlen(argtypes);
				if (len > 0)
					argtypes[len-1] = '\0';
			}
			if (!PQgetisnull(result, 0, 6))   /* Column 6: proallargtypes */
			{
				value = PQgetvalue(result, 0, 6);
				allargtypes = value+1;
				len = strlen(allargtypes);
				if (len > 0)
					allargtypes[len-1] = '\0';
			}
			if (!PQgetisnull(result, 0, 7))   /* Column 7: yaml_rettypes */
			{
				value = PQgetvalue(result, 0, 7);
				yaml_rettypes = value+1;
				len = strlen(yaml_rettypes);
				if (len > 0)
					yaml_rettypes[len-1] = '\0';
			}

			/*
			 * These constraints should all be enforced in the catalog, so
			 * if something is wrong then it's a coding error above.
			 */
			XASSERT(rettype);
			XASSERT(argtypes);
			XASSERT(nargs >= 0);

			/*
			 * If we just derived the parameters from the catalog then we
			 * need complete our internal metadata.
			 */
			plist = NULL;
			if (!obj->u.function.parameters)
			{
				/* strtok is destructive and we need to preserve the original
				 * string, so we make some annoying copies prior to strtok.
				 */
				tmp1 = copyscalar(argtypes);
				tmp2 = copyscalar(argnames);
				tmp3 = copyscalar(argmodes);

				type = strtok_r(tmp1, ",", &typetokens);
				name = strtok_r(tmp2, ",", &nametokens);
				mode = strtok_r(tmp3, ",", &modetokens);

				/*
				 * Name and mode are used for IN/OUT parameters and may not be
				 * present.  In the event that they are we are looking for:
				 *   - the "i" (in) arguments
				 *   - the "b" (inout) arguments
				 * we skip over:
				 *   - the "o" (out) arguments.
				 *   - the "t" (table out) arguments.
				 *
				 * Further it is possible for some of the arguments to be named
				 * and others to be unnamed.  The unnamed arguments will show
				 * up as "" (two quotes, not an empty string) if there is an
				 * argnames defined.
				 *
				 * If argmodes is not defined then all names in proargnames
				 * refer to input arguments.
				 */

				while (mode && strcmp(mode, "i") && strcmp(mode, "b"))
				{
					name = strtok_r(NULL, ",", &nametokens);
					mode = strtok_r(NULL, ",", &modetokens);
				}
				name_end = (NULL == name);
				mode_end = (NULL == mode);

				i = 0;
				while (type)
				{
					/* Keep track of which parameter we are on */
					i++;
					XASSERT(i <= nargs);

					/*
					 * If a name was not specified by the user, and was not
					 * specified by the in/out parameters then we assign it a
					 * default name.
					 */
					if (!name)
					{
						/* single argument functions always default to "value" */
						if (i == 1 && nargs == 1)
							name = (char*) "value";

						/* Base name on default parameter names for the first
						 * two arguments */
						else if (i <= 2)
							name = (char*) default_parameter_names[obj->kind][i-1];

						/*
						 * If we still didn't decide on a name, make up
						 * something useless.
						 */
						if (!name)
						{
							snprintf(str, STR_LEN, "parameter%d", i);
							name = str;
						}
					}

					if (!plist)
					{
						plist = mapred_malloc(sizeof(mapred_plist_t));
						plist->name = copyscalar(name);
						plist->type = copyscalar(type);
						plist->next = (mapred_plist_t *) NULL;
						obj->u.function.parameters = plist;
					}
					else
					{
						plist->next = mapred_malloc(sizeof(mapred_plist_t));
						plist = plist->next;
						plist->name = copyscalar(name);
						plist->type = copyscalar(type);
						plist->next = (mapred_plist_t *) NULL;
					}

					/* Procede to the next parameter */
					type = strtok_r(NULL, ",", &typetokens);
					if (!name_end)
					{
						name = strtok_r(NULL, ",", &nametokens);
						name_end = (NULL == name);
					}
					if (!mode_end)
					{
						mode = strtok_r(NULL, ",", &modetokens);
						mode_end = (NULL == mode);
					}
					while (mode && strcmp(mode, "i") && strcmp(mode, "b"))
					{
						if (!name_end)
						{
							name = strtok_r(NULL, ",", &nametokens);
							name_end = (NULL == name);
						}
						if (!mode_end)
						{
							mode = strtok_r(NULL, ",", &modetokens);
							mode_end = (NULL == mode);
						}
					}
				}

				mapred_free(tmp1);
				mapred_free(tmp2);
				mapred_free(tmp3);
				tmp1 = NULL;
				tmp2 = NULL;
				tmp3 = NULL;
			}

			/*
			 * Check that the number of parameters received is appropriate.
			 * This would be better moved to a generalized validation routine.
			 */
			switch (obj->kind)
			{

				case MAPRED_MAPPER:
					/*
					 * It would probably be possible to start supporting zero
					 * argument mappers, but:
					 *   1) It would require more modifications
					 *   2) Doesn't currently have a known use case
					 *   3) Has easy workarounds
					 */
					if (nargs < 1)
					{
						mapred_obj_error(obj, "Transition functions require "
										 "two or more parameters");
						XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
					}
					break;

				case MAPRED_TRANSITION:
					if (nargs < 2)
					{
						mapred_obj_error(obj, "Transition functions require "
										 "two or more parameters");
						XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
					}
					if (retset)
					{
						mapred_obj_error(obj, "Transition functions cannot "
										 "be table functions");
						XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
					}
					break;

				case MAPRED_COMBINER:
					if (nargs != 2)
					{
						mapred_obj_error(obj, "Consolidate functions require "
										 "exactly two parameters");
						XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
					}
					if (retset)
					{
						mapred_obj_error(obj, "Consolidate functions cannot "
										 "be table functions");
						XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
					}
					break;

				case MAPRED_FINALIZER:
					if (nargs != 1)
					{
						mapred_obj_error(obj, "Finalize functions require "
										 "exactly one parameter");
						XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
					}
					break;

				default:
					XASSERT(false);
			}

			/* Fill in return type information */
			if (retset)
				obj->u.function.mode = MAPRED_MODE_MULTI;
			else
				obj->u.function.mode = MAPRED_MODE_SINGLE;

			/*
			 * Determine the return type information, there are 3 primary
			 * subcases:
			 *
			 *  1) Function is defined with OUT/TABLE parameters.
			 *  2) Function returns a simple type.
			 *  3) Function returns a complex type.
			 *  4) Return type is void [error]
			 */
			plist = returns = NULL;
			if (argmodes && strlen(argmodes) > 0)
			{

				/* strtok is destructive and we need to preserve the original
				 * string, so we make some annoying copies prior to strtok.
				 */
				tmp1 = copyscalar(allargtypes);
				tmp2 = copyscalar(argnames);
				tmp3 = copyscalar(argmodes);

				type = strtok_r(tmp1, ",", &typetokens);
				name = strtok_r(tmp2, ",", &nametokens);
				mode = strtok_r(tmp3, ",", &modetokens);

				i = 1;
				while (mode)
				{
					while (mode &&
						   strcmp(mode, "o") &&
						   strcmp(mode, "b") &&
						   strcmp(mode, "t"))
					{
						/* skip input parameters */
						type = strtok_r(NULL, ",", &typetokens);
						name = strtok_r(NULL, ",", &nametokens);
						mode = strtok_r(NULL, ",", &modetokens);
					}
					if (mode)
					{
						XASSERT(type);

						newitem = mapred_malloc(sizeof(mapred_plist_t));

						/*
						 * Note we haven't made local copies of these, we will
						 * do this after resolution when validating against any
						 * RETURNS defined in the yaml, if any.
						 */

						if( NULL != name &&
								0 != strcmp(name, "") &&
								0 != strcmp(name, "\"\"") )
						{
							/*if name defined in db, just use it*/
							newitem->name = copyscalar(name);
						}
						else
						{
							/*else just obey the default name in db*/
							snprintf( str, STR_LEN, "column%d", i);
							newitem->name = copyscalar(str);
						}

						newitem->type = copyscalar(type);

						newitem->next = NULL;

						if (plist)
							plist->next = newitem;
						else
							returns = newitem;
						plist = newitem;
						++i;
					}
					type = strtok_r(NULL, ",", &typetokens);
					name = strtok_r(NULL, ",", &nametokens);
					mode = strtok_r(NULL, ",", &modetokens);
				}

				mapred_free(tmp1);
				mapred_free(tmp2);
				mapred_free(tmp3);
				tmp1 = NULL;
				tmp2 = NULL;
				tmp3 = NULL;
			}

			/*
			 * If the arguments were not defined in the function definition then
			 * we check to see if this was a complex type by looking up the type
			 * information in pg_attribute.
			 */
			if (!returns)
			{
				bufcat(&buffer,
					   "SELECT attname, atttypid::regtype\n"
					   "FROM   pg_attribute a\n"
					   "JOIN   pg_class c on (a.attrelid = c.oid)\n"
					   "WHERE  not a.attisdropped\n"
					   "  AND  a.attnum > 0\n"
					   "  AND  c.reltype = '");
				bufcat(&buffer, rettype);
				bufcat(&buffer,
					   "'::regtype\n"
					   "ORDER BY -attnum");
				result2 = PQexec(conn, buffer->buffer);
				bufreset(buffer);

				if (PQresultStatus(result2) != PGRES_TUPLES_OK)
				{
					char *error = PQresultErrorMessage(result);

					mapred_obj_error(obj, "Error resolving function: %s", error);
					XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
				}
				else if (PQntuples(result2) > 0)
				{
					/* We have a complex type, build the return list */
					for (i = 0; i < PQntuples(result2); i++)
					{
						name = PQgetvalue(result2, i, 0);
						type = PQgetvalue(result2, i, 1);

						newitem = mapred_malloc(sizeof(mapred_plist_t));
						newitem->name = copyscalar(name);
						newitem->type = copyscalar(type);
						newitem->next = returns;
						returns = newitem;
					}
				}
			}

			/*
			 * If the return types were not defined in either the argument list
			 * nor the catalog then we assume it is a simple type.
			 */
			if (!returns)
			{
				/* Check against "void" which is a special return type that
				 * means there is no return value - which we don't support for
				 * mapreduce.
				 */
				if (!strcmp(rettype, "void"))
				{
					mapred_obj_error(obj, "Function returns void");
					XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
				}
				returns = mapred_malloc(sizeof(mapred_plist_t));
				returns->type = copyscalar(rettype);
				returns->name = NULL;
				returns->next = NULL;
			}

			/*
			 * We now should have a returns list, compare it against the RETURNS
			 * list given in the yaml.  The yaml overrides return names, but can
			 * not override return types.  If the return types are incompatible
			 * raise an error.
			 */
			obj->u.function.internal_returns = returns;
			if (obj->u.function.returns)
			{
				/*
				 * The first thing to do is normalize the given return types
				 * with their formal names.  This will, for example turn a type
				 * like "float8" => "double precision".  The input name might
				 * be correct (float8) but we need it represented as the formal
				 * name so that we can compare against the formal name we got
				 * when we looked up the function in the catalog.
				 */
				plist = obj->u.function.returns;
				type = strtok_r(yaml_rettypes, ",", &typetokens);
				while (plist)
				{
					XASSERT(type);  /* should be an equal number */

					/*
					 * If we have a type specified replace it with the one we
					 * resolved from the select stmt, otherwise just keep it
					 * as NULL and fill it in during the compare against what
					 * was in the catalog.
					 */
					if (plist->type)
					{
						mapred_free(plist->type);

						/*
						 * When in an array the typname may get wrapped in
						 * double quotes, if so we need to strip them back out.
						 */
						if (type[0] == '"')
						{
							plist->type = copyscalar(type+1);
							plist->type[strlen(plist->type)-1] = '\0';
						}
						else
						{
							plist->type = copyscalar(type);
						}
					}

					plist = plist->next;
					type = strtok_r(NULL, ",", &typetokens);
				}


				/* Compare against actual function return types */
				plist = obj->u.function.returns;
				plist2 = returns;
				while (plist && plist2)
				{
					XASSERT(plist->name);   /* always defined in YAML */
					XASSERT(plist2->type);  /* always defined in SQL */

					/*
					 * In the YAML it is possible to have a name without a type,
					 * if that is the case then simply take the SQL type.
					 */
					if (!plist->type)
						plist->type = copyscalar(plist2->type);
					else if (strcmp(plist->type, plist2->type))
						break;
					plist  = plist->next;
					plist2 = plist2->next;
				}
				if (plist || plist2)
				{
					mapred_obj_error(obj, "RETURN parameter '%s %s' != '%s %s'",
									 plist ? plist->name : "\"\"",
									 plist ? plist->type : "-",
									 plist2 ? (plist2->name ? plist2->name : plist->name) : "\"\"",
									 plist2 ? plist2->type : "-");
					XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
				}
			}
			else
			{
				obj->u.function.returns = returns;
				i = 0;
				for (plist = returns; plist; plist = plist->next)
				{
					XASSERT(plist->type);
					i++;
					plist->type = copyscalar(plist->type);

					/*
					 * if plist->name is not null and empty string,
					 * then use that name
					 */
					if (plist->name &&
							0 != strcmp(plist->name, "") &&
							0 != strcmp(plist->name, "\"\"") )
					{
						plist->name = copyscalar(plist->name);
					}
					/*
					 * else We need generate a name anyway
					 */
					else
					{
						/*
						 * Manufacture a name for a column based on default
						 * naming rules.
						 */
						name = (char*) NULL;
						if (i <= 2)
							name = (char*) default_return_names[obj->kind][i-1];
						if (!name)
						{
							snprintf(str, STR_LEN, "parameter%d", i);
							name = str;
						}
						plist->name = copyscalar(name);
					}
				}
			}
		}
	}
	XFINALLY
	{
		if (result)
			PQclear(result);
		if (result2)
			PQclear(result2);
		if (buffer)
			mapred_free(buffer);

		if (tmp1)
			mapred_free(tmp1);
		if (tmp2)
			mapred_free(tmp2);
		if (tmp3)
			mapred_free(tmp3);

	}
	XTRY_END;
}