boolean mapred_create_object()

in gpcontrib/gpmapreduce/src/mapred.c [2505:3806]


boolean mapred_create_object(PGconn *conn, mapred_document_t *doc,
							 mapred_object_t *obj)
{
	mapred_clist_t *clist    = NULL;
	mapred_plist_t *plist    = NULL;
	mapred_plist_t *plist2   = NULL;
	const char     *ckind    = NULL;
	buffer_t       *buffer   = NULL;
	buffer_t       *qbuffer  = NULL;
	PGresult       *result   = NULL;

	/* If the object was created in a prior pass, then do nothing */
	if (obj->created)
		return true;

	/* Otherwise attempt to create the object */
	XTRY
	{
		/* allocates 1024 bytes, extending by 1024 bytes if we run out */
		buffer = makebuffer(1024, 1024);

		switch (obj->kind)
		{
			case MAPRED_INPUT:
				XASSERT(obj->name);

				switch (obj->u.input.type)
				{
					case MAPRED_INPUT_TABLE:
						/* Nothing to actually create */
						obj->created = true;
						break;

					case MAPRED_INPUT_FILE:
					case MAPRED_INPUT_GPFDIST:
						XASSERT(obj->u.input.files);

						/* Allocate and produce buffer */
						bufcat(&buffer, "CREATE EXTERNAL TABLE ");
						bufcat(&buffer, doc->prefix);
						bufcat(&buffer, obj->name);
						bufcat(&buffer, "(");
						for (plist = obj->u.input.columns;
							 plist;
							 plist = plist->next)
						{
							bufcat(&buffer, plist->name);
							bufcat(&buffer, " ");
							bufcat(&buffer, plist->type);
							if (plist->next)
								bufcat(&buffer, ", ");
						}
						bufcat(&buffer, ")\n");
						bufcat(&buffer, "  LOCATION(");
						for (clist = obj->u.input.files;
							 clist;
							 clist = clist->next)
						{
							char *domain_port, *path, *p = NULL;

							if (obj->u.input.type == MAPRED_INPUT_GPFDIST)
								bufcat(&buffer, "'gpfdist://");
							else
								bufcat(&buffer, "'file://");

							/*
							 * The general syntax of a URL is    scheme://domain:port/path?query_string#fragment_id
							 * clist->value should contain just           domain:port/path?query_string#fragment_id
							 */
							p = strchr(clist->value, '/');
							if (p == NULL)
							{
								mapred_obj_error(obj, "Failed to find '/' indicating start of path (%s)",
												 clist->value);
								XRAISE(MAPRED_PARSE_ERROR,
									   "Invalid INPUT source specification");
							}
							if (p == clist->value)
							{
								mapred_obj_error(obj, "Missing domain and port before '/' indicating start of path (%s)",
												 clist->value);
								XRAISE(MAPRED_PARSE_ERROR,
									   "Invalid INPUT source specification");
							}
							domain_port = clist->value;
							path        = p+1;

							/*
							 * Overwrite the / separating the domain:port from the path
							 * with a nul and move back one byte to check for a trailing ':'.
							 * We put the / back in when copying into the destination buffer.
							 */
							*p-- = '\0';

							if (strlen(path) < 1)
							{
								mapred_obj_error(obj, "Missing path after '/' (%s)",
												 clist->value);
								XRAISE(MAPRED_PARSE_ERROR,
									   "Invalid INPUT source specification");
							}

							/*
							 * We allow a trailing ':'  (e.g. host:/filepath)
							 * but we must not copy it into the external table url.
							 */
							if (*p == ':')
								*p = '\0';

							if (strlen(domain_port) < 1)
							{
								mapred_obj_error(obj, "Missing host before '/' (%s)",
												 clist->value);
								XRAISE(MAPRED_PARSE_ERROR,
									   "Invalid INPUT source specification");
							}

							bufcat(&buffer, domain_port);
							bufcat(&buffer, "/");
							bufcat(&buffer, path);
							if (clist->next)
								bufcat(&buffer, "',\n           ");
						}
						bufcat(&buffer, "')\n");
						if (obj->u.input.format == MAPRED_FORMAT_CSV)
							bufcat(&buffer, "  FORMAT 'CSV'");
						else
							bufcat(&buffer, "  FORMAT 'TEXT'");
						if (obj->u.input.delimiter ||
							obj->u.input.escape    ||
							obj->u.input.quote     ||
							obj->u.input.null)
						{
							bufcat(&buffer, " ( ");
							if (obj->u.input.delimiter)
							{
								bufcat(&buffer, "DELIMITER '");
								bufcat(&buffer, obj->u.input.delimiter);
								bufcat(&buffer, "' ");
							}
							if (obj->u.input.escape)
							{
								bufcat(&buffer, "ESCAPE '");
								bufcat(&buffer, obj->u.input.escape);
								bufcat(&buffer, "' ");
							}
							if (obj->u.input.quote)
							{
								bufcat(&buffer, "QUOTE '");
								bufcat(&buffer, obj->u.input.quote);
								bufcat(&buffer, "' ");
							}
							if (obj->u.input.null)
							{
								bufcat(&buffer, "NULL '");
								bufcat(&buffer, obj->u.input.null);
								bufcat(&buffer, "' ");
							}
							bufcat(&buffer, ")");
						}

						if (obj->u.input.error_limit > 0)
						{
							char intbuf[11];
							snprintf(intbuf, 11, "%d",
									 obj->u.input.error_limit);

							bufcat(&buffer, "\n  SEGMENT REJECT LIMIT ");
							bufcat(&buffer, intbuf);
						}

						bufcat(&buffer, ";\n\n");
						break;

					case MAPRED_INPUT_EXEC:
						XASSERT(obj->u.input.desc);

						bufcat(&buffer, "CREATE EXTERNAL WEB TABLE ");
						bufcat(&buffer, doc->prefix);
						bufcat(&buffer, obj->name);
						bufcat(&buffer, "(");
						for (plist = obj->u.input.columns;
							 plist;
							 plist = plist->next)
						{
							bufcat(&buffer, plist->name);
							bufcat(&buffer, " ");
							bufcat(&buffer, plist->type);
							if (plist->next)
								bufcat(&buffer, ", ");
						}
						bufcat(&buffer, ")\n");
						bufcat(&buffer, "EXECUTE '");
						bufcat(&buffer, obj->u.input.desc);
						bufcat(&buffer, "'\n");
						if (obj->u.input.format == MAPRED_FORMAT_CSV)
							bufcat(&buffer, "  FORMAT 'CSV'");
						else
							bufcat(&buffer, "  FORMAT 'TEXT'");
						if (obj->u.input.delimiter ||
							obj->u.input.quote     ||
							obj->u.input.null)
						{
							bufcat(&buffer, " ( ");
							if (obj->u.input.delimiter)
							{
								bufcat(&buffer, "DELIMITER '");
								bufcat(&buffer, obj->u.input.delimiter);
								bufcat(&buffer, "' ");
							}
							if (obj->u.input.quote)
							{
								bufcat(&buffer, "QUOTE '");
								bufcat(&buffer, obj->u.input.quote);
								bufcat(&buffer, "' ");
							}
							if (obj->u.input.null)
							{
								bufcat(&buffer, "NULL '");
								bufcat(&buffer, obj->u.input.null);
								bufcat(&buffer, "' ");
							}
							bufcat(&buffer, ")");
						}

						if (obj->u.input.error_limit > 0)
						{
							char intbuf[11];
							snprintf(intbuf, 11, "%d",
									 obj->u.input.error_limit);

							bufcat(&buffer, "\n  SEGMENT REJECT LIMIT ");
							bufcat(&buffer, intbuf);
						}
						bufcat(&buffer, ";\n\n");
						break;

					case MAPRED_INPUT_QUERY:
						XASSERT(obj->u.input.desc);

						/*
						 *  CREATE TEMPORARY VIEW <name> AS
						 *  <desc>;
						 */
						bufcat(&buffer, "CREATE TEMPORARY VIEW ");
						bufcat(&buffer, obj->name);
						bufcat(&buffer, " AS\n");
						bufcat(&buffer, obj->u.input.desc);
						bufcat(&buffer, ";\n\n");
						break;

					case MAPRED_INPUT_NONE:
					default:
						XASSERT(false);
				}
				if (global_print_flag || global_debug_flag)
					printf("-- INPUT %s\n", obj->name);
				break;

			case MAPRED_OUTPUT:
				/*
				 * Outputs have no backend objects created directly.
				 * For output tables we may issue a create table as
				 * select, but that occurs at run-time.
				 */
				obj->created = true;
				mapred_setup_columns(conn, obj);
				break;

				/*
				 * The function types have different defaults and generate
				 * slightly different error messages, but basically do the
				 * same thing.
				 */
			case MAPRED_MAPPER:
			case MAPRED_TRANSITION:
			case MAPRED_COMBINER:
			case MAPRED_FINALIZER:
				ckind = mapred_kind_name[obj->kind];

				XASSERT(obj->name);

				/*
				 * 'kind' specific initialization accomplished above, now handle
				 * the generic function creation.
				 */
				if (global_print_flag || global_debug_flag)
					printf("-- %s %s\n", ckind, obj->name);

				/*
				 * Nothing to do if we already looked up the function in the
				 * catalog.
				 */
				if (obj->internal)
				{
					obj->created = true;
					break;
				}

				/* Non-internal functions should have these defined */
				XASSERT(obj->u.function.body);
				XASSERT(obj->u.function.language);

				mapred_setup_columns(conn, obj);
				XASSERT(obj->u.function.parameters);
				XASSERT(obj->u.function.rtype.name);
				XASSERT(NULL == obj->u.function.internal_returns);

				/*
				 * fill in the buffer:
				 *
				 *    CREATE FUNCTION <name>(<parameters>)
				 *    RETURNS [SETOF] <rtype> LANGUAGE <lang> AS
				 *    $$
				 *    <body>
				 *    $$ [STRICT] [IMMUTABLE];
				 *
				 */
				bufcat(&buffer, "CREATE FUNCTION ");
				bufcat(&buffer, doc->prefix);
				bufcat(&buffer, obj->name);
				bufcat(&buffer, "(");

				/* Handle parameter list */
				for (plist = obj->u.function.parameters;
					 plist;
					 plist = plist->next)
				{
					bufcat(&buffer, plist->name);
					bufcat(&buffer, " ");
					bufcat(&buffer, plist->type);
					if (plist->next)
						bufcat(&buffer, ", ");
				}

				/* Handle Return clause */
				bufcat(&buffer, ")\nRETURNS ");
				if (obj->u.function.mode == MAPRED_MODE_MULTI)
					bufcat(&buffer, "SETOF ");
				bufcat(&buffer, obj->u.function.rtype.name);

				/*
				 * Handle LANGUAGE clause, every langauge but 'C' and 'SQL'
				 * has 'pl' prefixing it
				 */
				if (!strcasecmp("C", obj->u.function.language) ||
					!strcasecmp("SQL", obj->u.function.language) ||
					!strncasecmp("PL", obj->u.function.language, 2))
				{
					bufcat(&buffer, " LANGUAGE ");
					bufcat(&buffer, obj->u.function.language);
				}
				else
				{
					bufcat(&buffer, " LANGUAGE pl");
					bufcat(&buffer, obj->u.function.language);
				}

				/* python only has an untrusted form */
				if (!strcasecmp("python", obj->u.function.language))
					bufcat(&buffer, "u");


				bufcat(&buffer, " AS ");

				/*
				 * Handle procedural language specific formatting for the
				 * function definition.
				 *
				 * C language functions are defined using the two parameter
				 * form:  AS "library", "function".
				 *
				 * Perl functions append the yaml file line number via a
				 *  #line declaration.
				 *
				 * Python functions try to append the yaml file line number
				 * by inserting a bunch of newlines.  (only works for runtime
				 * errors, not compiletime errors).
				 */
				if (!strcasecmp("C", obj->u.function.language))
				{
					bufcat(&buffer, "$$");
					bufcat(&buffer, obj->u.function.library);
					bufcat(&buffer, "$$, $$");
					bufcat(&buffer, obj->u.function.body);
					bufcat(&buffer, "$$");
				}
				else if (!strncasecmp("plperl", obj->u.function.language, 6) ||
						 !strncasecmp("perl",   obj->u.function.language, 4))
				{
					char lineno[10];
					snprintf(lineno, sizeof(lineno), "%d", obj->u.function.lineno);
					bufcat(&buffer, "$$\n#line ");
					bufcat(&buffer, lineno);
					bufcat(&buffer, "\n");
					bufcat(&buffer, obj->u.function.body);
					if (buffer->buffer[buffer->position-1] != '\n')
						bufcat(&buffer, "\n");
					bufcat(&buffer, "$$");
				}
				else if (!strncasecmp("plpython", obj->u.function.language, 8) ||
						 !strncasecmp("python",   obj->u.function.language, 6))
				{
					/*
					 * Python very stubborn about not letting you manually
					 * adjust line number.  So instead we take the stupid route
					 * and just insert N newlines.
					*/
					int i;
					bufcat(&buffer, "$$\n");
					for (i = 1; i < obj->u.function.lineno-2; i++)
						bufcat(&buffer, "\n");
					bufcat(&buffer, obj->u.function.body);
					if (buffer->buffer[buffer->position-1] != '\n')
						bufcat(&buffer, "\n");
					bufcat(&buffer, "$$");
				}
				else
				{
					/* Some generic other language, take our best guess */
					bufcat(&buffer, "$$");
					bufcat(&buffer, obj->u.function.body);
					bufcat(&buffer, "$$");
				}

				/* Handle options */
				if (obj->u.function.flags & mapred_function_strict)
					bufcat(&buffer, " STRICT");
				if (obj->u.function.flags & mapred_function_immutable)
					bufcat(&buffer, " IMMUTABLE");

				/* All done */
				bufcat(&buffer, ";\n\n");
				break;

			case MAPRED_REDUCER:
			{
				mapred_object_t *transition = obj->u.reducer.transition.object;
				mapred_object_t *combiner   = obj->u.reducer.combiner.object;
				mapred_object_t *finalizer  = obj->u.reducer.finalizer.object;
				char *state;

				XASSERT(obj->name);
				XASSERT(transition);
				XASSERT(transition->name);

				/*
				 * If the reducer depends on an object that hasn't been created
				 * then return false, it will be resolved during a second pass
				 */
				if ((transition && !transition->created) ||
					(combiner   && !combiner->created)   ||
					(finalizer  && !finalizer->created))
				{
					if (global_print_flag && global_debug_flag)
						printf("-- deferring REDUCE %s\n", obj->name);
					break;
				}
				if (global_print_flag || global_debug_flag)
					printf("-- REDUCE %s\n", obj->name);

				/* Now, set things up to create the thing */
				mapred_setup_columns(conn, obj);
				plist = transition->u.function.parameters;
				XASSERT(plist);          /* state */
				XASSERT(plist->next);    /* parameters */

				if (obj->u.reducer.ordering)
					bufcat(&buffer, "CREATE ORDERED AGGREGATE ");
				else
					bufcat(&buffer, "CREATE AGGREGATE ");
				bufcat(&buffer, doc->prefix);
				bufcat(&buffer, obj->name);
				bufcat(&buffer, " (");

				/*
				 * Get the state type, and write out the aggregate parameters
				 * based on the parameter list of the transition function.
				 */
				plist = transition->u.function.parameters;
				state = plist->type;
				for (plist = plist->next; plist; plist = plist->next)
				{
					bufcat(&buffer, plist->type);
					if (plist->next)
						bufcat(&buffer, ", ");
				}
				bufcat(&buffer, ") (\n");
				bufcat(&buffer, "  stype = ");
				bufcat(&buffer, state);
				if (obj->u.reducer.initialize)
				{
					bufcat(&buffer, ",\n  initcond = '");
					bufcat(&buffer, obj->u.reducer.initialize);
					bufcat(&buffer, "'");
				}
				bufcat(&buffer, ",\n  sfunc = ");
				if (!transition->internal)
					bufcat(&buffer, doc->prefix);
				bufcat(&buffer, transition->name);
				if (combiner)
				{
					bufcat(&buffer, ",\n  combinefunc = ");
					if (!combiner->internal)
						bufcat(&buffer, doc->prefix);
					bufcat(&buffer, combiner->name);
				}

				/*
				 * To handle set returning finalizers the finalizer is pushed
				 * into the task definition rather than being placed in the
				 * uda where it belongs.
				 */
				/*
				 if (obj->u.reducer.finalizer.name)
				 {
				    bufcat(&buffer, ",\n  finalfunc = ");
				    bufcat(&buffer, obj->u.reducer.finalizer.name);
				 }
				*/

				bufcat(&buffer, "\n);\n\n");
				break;
			}

			case MAPRED_TASK:
			case MAPRED_EXECUTION:
			{
				mapred_object_t *input    = obj->u.task.input.object;
				mapred_object_t *mapper   = obj->u.task.mapper.object;
				mapred_object_t *reducer  = obj->u.task.reducer.object;
				mapred_plist_t  *columns  = NULL;
				mapred_plist_t  *ingrouping = NULL;
				mapred_plist_t  *newitem  = NULL;
				mapred_plist_t  *grouping = NULL;
				mapred_plist_t  *last     = NULL;
				mapred_plist_t  *scan     = NULL;
				buffer_t        *swap;

				if (!obj->u.task.execute)
					XASSERT(obj->name);
				XASSERT(obj->u.task.input.name);

				if (!qbuffer)
					qbuffer = makebuffer(1024, 1024);
				else
					bufreset(qbuffer);

				/*
				 * If the task depends on an object that hasn't been created then
				 * return false, it will be resolved during a second pass
				 */
				if ((input   && !input->created)  ||
					(mapper  && !mapper->created) ||
					(reducer && !reducer->created))
				{
					if (global_print_flag && global_debug_flag)
					{
						if (obj->u.task.execute)
							printf("-- deferring EXECUTION\n");
						else
							printf("-- deferring TASK %s\n", obj->name);
					}
					break;
				}

				if (global_print_flag || global_debug_flag)
				{
					if (obj->u.task.execute)
						printf("-- EXECUTION\n");
					else
						printf("-- TASK %s\n", obj->name);
				}

				/*
				 * 1) Handle the INPUT, two cases:
				 *   1a) There is no MAP/REDUCE:  "SELECT * FROM <input>"
				 *   1b) There is a MAP and/or REDUCE:  "<input>"
				 */
				mapred_setup_columns(conn, obj);
				if (!obj->u.task.mapper.name && !obj->u.task.reducer.name)
				{
					/* Allocate the buffer for the input. */
					if (input->u.input.type == MAPRED_INPUT_TABLE)
					{
						bufcat(&qbuffer, "SELECT * FROM ");
						bufcat(&qbuffer, input->u.input.desc);
					}
					else
					{
						bufcat(&qbuffer, "SELECT * FROM ");
						bufcat(&qbuffer, input->name);
					}
				}
				else
				{
					/* Input is just the name or description of the input */
					if (input->u.input.type == MAPRED_INPUT_TABLE)
						bufcat(&qbuffer, input->u.input.desc);
					else
						bufcat(&qbuffer, input->name);
				}

				/*
				 * How we get the columns depends a bit on the input.
				 * Is the input actually an "MAPRED_INPUT" object, or is it
				 * a "MAPRED_TASK" object?
				 */
				switch (input->kind)
				{
					case MAPRED_INPUT:
						columns = input->u.input.columns;
						break;

					case MAPRED_TASK:
						columns    = input->u.task.returns;
						ingrouping = input->u.task.grouping;

						if (!columns)
						{
							mapred_obj_error(obj, "Unable to determine return "
											 "columns for TASK '%s'",
											 obj->u.task.input.name);
							XRAISE(MAPRED_PARSE_INTERNAL, NULL);
						}
						break;

					default:
						mapred_obj_error(obj, "SOURCE '%s' is not an INPUT or "
										 "TASK object",
										 obj->u.task.input.name);
						XRAISE(MAPRED_PARSE_ERROR, "Object creation Error");
						break;
				}
				XASSERT(columns);

				/*
				 * 2) Handle the MAPPER, two cases
				 *  2a) The Mapper returns an generated ADT that needs extraction
				 *           "SELECT key(m), ...
				 *            FROM (SELECT <map(...) as m FROM <input>) mapsubq
				 *  2b) The Mapper returns a single column:
				 *           "SELECT <map>(...) FROM <input>"
				 */
				XASSERT(mapper || !obj->u.task.mapper.name);
				if (mapper)
				{
					plist = mapper->u.function.returns;
					plist2 = mapper->u.function.internal_returns;
					XASSERT(plist);

					if (plist->next)
					{ /* 2a */
						bufcat(&buffer, "SELECT ");
						for (; plist; plist = plist->next )
						{
							if( obj->internal )
							{
								XASSERT(plist2 != NULL);
								bufcat(&buffer, plist2->name);
								plist2 = plist2->next;
							}
							else
							{
								bufcat(&buffer, plist->name);
							}
							bufcat(&buffer, "(m)");
							bufcat(&buffer, " as ");
							bufcat(&buffer, plist->name);
							if (plist->next)
								bufcat(&buffer, ", ");
						}
						bufcat(&buffer, "\nFROM (");
					}

					/* shared code */
					bufcat(&buffer, "SELECT ");
					if (!mapper->internal)
						bufcat(&buffer, doc->prefix);
					bufcat(&buffer, mapper->name);
					bufcat(&buffer, "(");
					plist = mapper->u.function.parameters;
					for (; plist; plist = plist->next)
					{
						/* Check if this parameter is one of the input columns */
						for (scan = columns; scan; scan = scan->next)
							if (!strcasecmp(plist->name, scan->name))
							{
								bufcat(&buffer, plist->name);
								break;
							}

						/* Task inputs also need to scan the grouping columns */
						if (!scan)
							for (scan = ingrouping; scan; scan = scan->next)
								if (!strcasecmp(plist->name, scan->name))
								{
									bufcat(&buffer, plist->name);
									break;
								}

						/* Check if this parameter is in global_plist */
						if (!scan)
							for (scan = global_plist; scan; scan = scan->next)
								if (!strcasecmp(plist->name, scan->name))
								{
									/*
									 * (HACK)
									 * Note that global_plist overloads the
									 * plist structure using the "type" field
									 * to store "value".
									 * (HACK)
									 */
									bufcat(&buffer, "'");
									bufcat(&buffer, scan->type);
									bufcat(&buffer, "'::");
									bufcat(&buffer, plist->type);
									break;
								}


						/*
						 * If we couldn't find it issue a warning and
						 * set to NULL
						 */
						if (!scan)
						{
							if (global_verbose_flag)
								fprintf(stderr, "       ");
							fprintf(stderr,
									"WARNING: unset parameter - "
									"%s(%s => NULL)\n",
									mapper->name, plist->name);
							bufcat(&buffer, "NULL::");
							bufcat(&buffer, plist->type);
						}

						/* Add a comma if there is another parameter */
						if (plist->next)
							bufcat(&buffer, ", ");
					}
					bufcat(&buffer, ") as ");

					/* break into cases again */
					plist = mapper->u.function.returns;
					if (plist->next)
					{  /* 2a */
						bufcat(&buffer, "m FROM ");
						bufcat(&buffer, qbuffer->buffer);
						bufcat(&buffer, ") mapxq\n");
						/*
						 * Need to work this through, it seems that m is true
						 * whenever a column is null, which is not the desired
						 * behavior.
						 *
						 * Look more closely at "grep" code for why we want it,
						 * and "oreilly" code for why we don't.
						 *
						 * For the moment the compromise is that we do it only
						 * for SINGLE mode functions, since MULTI mode can
						 * control it's own filtering without this.
						 */
						if (mapper->u.function.mode != MAPRED_MODE_MULTI)
							bufcat(&buffer, "WHERE m is not null");
					}
					else
					{
						bufcat(&buffer, plist->name);
						bufcat(&buffer, " FROM ");
						bufcat(&buffer, qbuffer->buffer);
					}

					/*
					 * Swap the buffer into the qbuffer for input as the next
					 * stage of the query pipeline.
					 */
					swap = qbuffer;
					qbuffer = buffer;
					buffer = swap;
					bufreset(buffer);

					/* Columns are now the output of the mapper */
					columns  = mapper->u.function.returns;
					ingrouping = NULL;
				}

				/*
				 * 3) Handle the Reducer, several sub-cases:
				 */
				if (obj->u.task.reducer.name)
				{
					/*
					 * Step 1:   Determine grouping columns
					 *    Find which columns are returned from the previous
					 *    stage that are NOT parameters to the reducer.
					 */
					grouping = last = NULL;
					if (!reducer)
					{
						/*
						 * We have a reducer, but it isn't listed in the YAML.
						 * How to work out parameter handling still needs to
						 * be worked out.  For now we just assume that this
						 * sort of function always takes a single "value" column
						 * and returns a "value" column.
						 */
						for (plist = columns; plist; plist = plist->next)
						{
							if (strcasecmp(plist->name, "value"))
							{
								if (grouping)
								{
									last->next =
										mapred_malloc(sizeof(mapred_plist_t));
									last = last->next;
								}
								else
								{
									grouping =
										mapred_malloc(sizeof(mapred_plist_t));
									last = grouping;
								}
								last->name = plist->name;
								last->type = plist->type;
								last->next = NULL;
							}
						}
					}
					else
					{   /* The reducer exists in the YAML */

						/* We precalculated the grouping columns */
						grouping = obj->u.task.grouping;
					}

					/* Fill in the buffer */
					bufcat(&buffer, "SELECT ");
					for (plist = grouping; plist; plist = plist->next)
					{
						bufcat(&buffer, plist->name);
						bufcat(&buffer, ", ");
					}

					/* Call the aggregation function */
					if (reducer && !reducer->internal)
						bufcat(&buffer, doc->prefix);
					bufcat(&buffer, obj->u.task.reducer.name);
					bufcat(&buffer, "(");

					if (reducer)
					{
						plist = reducer->u.reducer.parameters;
						for (; plist; plist = plist->next)
						{
							/* Check if parameter is one of the input columns */
							for (scan = columns; scan; scan = scan->next)
								if (!strcasecmp(plist->name, scan->name))
								{
									bufcat(&buffer, plist->name);
									break;
								}

							/* Task inputs need to scan the grouping columns */
							if (!scan)
								for (scan = ingrouping; scan; scan = scan->next)
									if (!strcasecmp(plist->name, scan->name))
									{
										bufcat(&buffer, plist->name);
										break;
									}

							/* Check if this parameter is in global_plist */
							if (!scan)
							{
								for (scan = global_plist;
									 scan;
									 scan = scan->next)
								{
									if (!strcasecmp(plist->name, scan->name))
									{
										/*
										 * (HACK)
										 * Note that global_plist overloads the
										 * plist structure using the "type"
										 * field to store "value".
										 * (HACK)
										 */
										bufcat(&buffer, "'");
										bufcat(&buffer, scan->type);
										bufcat(&buffer, "'::");
										bufcat(&buffer, plist->type);
										break;
									}
								}
							}

							/*
							 * If we couldn't find it issue a warning
							 * and set to NULL
							 */
							if (!scan)
							{
								if (global_verbose_flag)
									fprintf(stderr, "       ");
								fprintf(stderr,
										"WARNING: unset parameter - "
										"%s(%s => NULL)\n",
										reducer->name, plist->name);
								bufcat(&buffer, "NULL::");
								bufcat(&buffer, plist->type);
							}
							if (plist->next)
								bufcat(&buffer, ", ");
						}

						/* Handle ORDERING, if specified */
						clist = reducer->u.reducer.ordering;
						if (clist)
							bufcat(&buffer, " ORDER BY ");
						for(; clist; clist = clist->next)
						{
							bufcat(&buffer, clist->value);
							if (clist->next)
								bufcat(&buffer, ", ");
						}

					}
					else
					{
						/*
						 * non-yaml reducer always takes "value" as the
						 * input column
						 */

						/* Check if "value" is one of the input columns */
						for (scan = columns; scan; scan = scan->next)
							if (!strcasecmp(scan->name, "value"))
							{
								bufcat(&buffer, "value");
								break;
							}

						/* Task inputs also need to scan the grouping columns */
						if (!scan)
							for (scan = ingrouping; scan; scan = scan->next)
								if (!strcasecmp(plist->name, scan->name))
								{
									bufcat(&buffer, "value");
									break;
								}

						/* Check if this parameter is in global_plist */
						if (!scan)
							for (scan = global_plist; scan; scan = scan->next)
								if (!strcasecmp(scan->name, "value"))
								{
									/*
									 * (HACK)
									 * Note that global_plist overloads the
									 * plist structure using the "type" field
									 * to store "value".
									 * (HACK)
									 */
									bufcat(&buffer, "'");
									bufcat(&buffer, scan->type);
									bufcat(&buffer, "'::");
									bufcat(&buffer, plist->type);
									break;
								}

						if (!scan)
						{
							if (global_verbose_flag)
								fprintf(stderr, "       ");
							fprintf(stderr,
									"WARNING: unset parameter - "
									"%s(value => NULL)\n",
									obj->u.task.reducer.name);
							bufcat(&buffer, "NULL");
						}
					}

					bufcat(&buffer, ") as ");

					if (reducer)
					{
						plist = reducer->u.reducer.returns;
						XASSERT(plist);  /* Need to have a return! */

						/*
						 * If the reducer has a finalizer we push it outside of
						 * the context of the UDA so that we can properly handle
						 * set returning/column returning functions.
						 */
						if (reducer->u.reducer.finalizer.name)
							bufcat(&buffer, "r");
						else
							bufcat(&buffer, plist->name);
					}
					else
					{
						/*
						 * non-yaml reducer always return a single column
						 * named "value"
						 */
						bufcat(&buffer, "value");
					}
					bufcat(&buffer, "\nFROM ");
					if (mapper)
					{
						bufcat(&buffer, "(");
						bufcat(&buffer, qbuffer->buffer);
						bufcat(&buffer, ") mapsubq");
					}
					else
					{
						bufcat(&buffer, qbuffer->buffer);
					}
					if (grouping)
					{
						bufcat(&buffer, "\nGROUP BY ");
						for (plist = grouping; plist; plist = plist->next)
						{
							bufcat(&buffer, plist->name);
							if (plist->next)
								bufcat(&buffer, ", ");
						}
					}

					/*
					 * Swap the buffer into the qbuffer for input as the next
					 * stage of the query pipeline.
					 */
					swap = qbuffer;
					qbuffer = buffer;
					buffer = swap;
					bufreset(buffer);

					/*
					 * Add the return columns to the grouping columns and set
					 * it to the current columns.
					 *
					 * Note that unlike the columns set by the mapper or the
					 * input this is a list that must be de-allocated.
					 */
					columns = last;

					/*
					 * If the reducer had a finalizer we push it into another
					 * nested subquery since user defined aggregates aren't
					 * allowed to return sets.
					 *
					 * NOTE: this code mostly duplicates the MAP code above
					 */
					if (reducer && reducer->u.reducer.finalizer.name)
					{
						mapred_object_t *finalizer;

						finalizer = reducer->u.reducer.finalizer.object;
						XASSERT(finalizer);  /* FIXME */
						XASSERT(finalizer->u.function.returns);

						/*
						 * If the finalizer returns multiple columns then we
						 * need an extra layer of wrapping to extract them.
						 */
						plist = finalizer->u.function.returns;
						if (plist->next)
						{
							bufcat(&buffer, "SELECT ");

							/* the grouping columns */
							for (plist = grouping;
								 plist;
								 plist = plist->next)
							{
								bufcat(&buffer, plist->name);
								bufcat(&buffer, ", ");
							}
							plist2 = finalizer->u.function.internal_returns;
							for (plist = finalizer->u.function.returns;
								 plist;
								 plist = plist->next)
							{
								if( finalizer->internal )
								{
									XASSERT( plist2 != NULL );
									bufcat(&buffer, plist2->name);
									plist2 = plist2->next;
								}
								else
								{
									bufcat(&buffer, plist->name);
								}

								bufcat(&buffer, "(r)");
								bufcat(&buffer, " as ");
								bufcat(&buffer, plist->name);
								if (plist->next)
									bufcat(&buffer, ", ");
							}
							bufcat(&buffer, "\nFROM (");
						}

						/*
						 * Call the function on the returned state from
						 * the reducer.
						 */
						bufcat(&buffer, "SELECT ");

						/* grouping columns */
						for (plist = grouping;
							 plist;
							 plist = plist->next)
						{
							bufcat(&buffer, plist->name);
							bufcat(&buffer, ", ");
						}

						if (!finalizer->internal)
							bufcat(&buffer, doc->prefix);
						bufcat(&buffer, finalizer->name);
						bufcat(&buffer, "(r) as ");

						/* break into cases again */
						plist = finalizer->u.function.returns;
						if (plist->next)
							bufcat(&buffer, "r");
						else
							bufcat(&buffer, plist->name);
						bufcat(&buffer, " FROM (");
						bufcat(&buffer, qbuffer->buffer);
						bufcat(&buffer, ") redxq\n");

						/*
						 * If we have that extra layer of wrapping
						 * then close it off
						 */
						if (finalizer->u.function.returns->next)
							bufcat(&buffer, ") redsubq\n");

						/*
						 * Swap the buffer into the qbuffer for input as the next
						 * stage of the query pipeline.
						 */
						swap = qbuffer;
						qbuffer = buffer;
						buffer = swap;
						bufreset(buffer);
					}
				}


				/*
				 * 4) Handle the final transform into the view definition:
				 *        "CREATE TEMPORARY VIEW . AS .;"
				 */
				bufcat(&buffer, "CREATE TEMPORARY VIEW ");
				bufcat(&buffer, obj->name);
				bufcat(&buffer, " AS\n");
				bufcat(&buffer, qbuffer->buffer);
				bufcat(&buffer, ";\n\n");

				/*
				 * If there was a reducer then we have to release the columns
				 * list, otherwise it is a pointer to an existing list and can
				 * be ignored.
				 */
				if (obj->u.task.reducer.name)
				{
					plist = columns;
					while (plist && plist != grouping)
					{
						newitem = plist;
						plist = plist->next;
						mapred_free(newitem);
					}
				}
				break;
			}

			case MAPRED_ADT:
				XASSERT(obj->name);
				mapred_setup_columns(conn, obj);

				/*
				 * ADT's have generated names that already include the
				 * document prefix
				 */
				bufcat(&buffer, "CREATE TYPE ");
				bufcat(&buffer, obj->name);
				bufcat(&buffer, " as (");
				for (plist = obj->u.adt.returns; plist; plist = plist->next)
				{
					bufcat(&buffer, plist->name);
					bufcat(&buffer, " ");
					bufcat(&buffer, plist->type);
					if (plist->next)
						bufcat(&buffer, ", ");
				}
				bufcat(&buffer, ");\n\n");
				break;

			default:
				XASSERT(false);
		}

		if (buffer->position > 0)
		{

			/*
			 * In print-only mode we do everything but run the queries
			 * ie, we still create and destroy objects.
			 */
			if (global_print_flag || global_debug_flag)
				printf("%s", buffer->buffer);

			/*
			 * Try to create the object, but failure should not terminate
			 * the transaction, so wrap it in a savepoint.
			 */
			PQexec(conn, "SAVEPOINT mapreduce_save");
			result = PQexec(conn, buffer->buffer);
			if (PQresultStatus(result) == PGRES_COMMAND_OK)
			{
				obj->created = true;
				PQexec(conn, "RELEASE SAVEPOINT mapreduce_save");
			}
			else
			{
				char *error = PQresultErrorField(result, PG_DIAG_SQLSTATE);

				/* rollback to savepoint */
				PQexec(conn, "ROLLBACK TO SAVEPOINT mapreduce_save");
				PQexec(conn, "RELEASE SAVEPOINT mapreduce_save");

				/*
				 * If we have an "object does not exist" error from a SQL input
				 * then it may just be a dependency issue, so we don't error
				 * right away.
				 */
				if (obj->kind != MAPRED_INPUT ||
					obj->u.input.type != MAPRED_INPUT_QUERY ||
					strcmp(error, OBJ_DOES_NOT_EXIST))
				{
					if (global_verbose_flag)
						fprintf(stderr, "     - ");
					fprintf(stderr, "%s", PQresultErrorMessage(result));
					XRAISE(MAPRED_SQL_ERROR, "Object creation Failure");
				}
				if (global_verbose_flag)
					fprintf(stderr, "       Error: %s\n",
							PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY));

				/*
				 * If it is an error that we think we can recover from then we don't
				 * log the error immediately, but write it to a buffer in the event
				 * that recovery wasn't successful.
				 */
				if (doc->errors)
				{
					if (global_verbose_flag)
						bufcat(&doc->errors, "  - ");
					bufcat(&doc->errors, "Error: ");
					bufcat(&doc->errors, (char*) mapred_kind_name[obj->kind]);
					if (obj->name)
					{
						bufcat(&doc->errors, " '");
						bufcat(&doc->errors, obj->name);
						bufcat(&doc->errors, "'");
					}
					bufcat(&doc->errors, ": ");
					bufcat(&doc->errors, PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY));
					if (obj->line > 0)
					{
						char numbuf[64];
						sprintf(numbuf, ", at line %d", obj->line);
						bufcat(&doc->errors, numbuf);
					}
					bufcat(&doc->errors, "\n");
				}
			}
		}

		/*
		 * INPUTS setup columns AFTER creation.
		 * All other objects handle it above prior to creation.
		 */
		if (obj->kind == MAPRED_INPUT && obj->created)
			mapred_setup_columns(conn, obj);
	}
	XFINALLY
	{
		if (buffer)
			mapred_free(buffer);
		if (qbuffer)
			mapred_free(qbuffer);
	}
	XTRY_END;

	return obj->created;
}