void mapred_setup_columns()

in gpcontrib/gpmapreduce/src/mapred.c [1563:2014]


void mapred_setup_columns(PGconn *conn, mapred_object_t *obj)
{
	mapred_object_t *sub;
	PGresult        *result;

	/* switch based on object type */
	switch (obj->kind)
	{
		case MAPRED_ADT:
			break;

		case MAPRED_INPUT:

			/*
			 * Should be called after creation, otherwise catalog queries
			 * could fail.
			 */
			XASSERT(obj->created);

			/* setup the column list for database defined inputs */
			if (obj->u.input.type == MAPRED_INPUT_TABLE ||
				obj->u.input.type == MAPRED_INPUT_QUERY)
			{
				/*
				 * This gets the ordered list of columns for the first
				 * input of the given name in the user's search path.
				 */
				buffer_t *buffer = makebuffer(1024, 1024);
				bufcat(&buffer,
					   "SELECT  attname, "
					   "        pg_catalog.format_type(atttypid, atttypmod)\n"
					   "FROM    pg_catalog.pg_attribute\n"
					   "WHERE   attnum > 0 AND attrelid = lower('");
				if (obj->u.input.type == MAPRED_INPUT_TABLE)
					bufcat(&buffer, obj->u.input.desc);
				else
					bufcat(&buffer, obj->name);
				bufcat(&buffer,
					   "')::regclass\n"
					   "ORDER BY   -attnum;\n\n");

				if (global_debug_flag)
					printf("%s", buffer->buffer);

				result = PQexec(conn, buffer->buffer);
				mapred_free(buffer);

				if (PQresultStatus(result) == PGRES_TUPLES_OK &&
					PQntuples(result) > 0)
				{
					mapred_plist_t *newitem;
					int i;

					/* Destroy any previous default values we setup */
					mapred_destroy_plist(&obj->u.input.columns);

					/*
					 * The columns were sorted reverse order above so
					 * the list can be generated back -> front
					 */
					for (i = 0; i < PQntuples(result); i++)
					{
						char *name = PQgetvalue(result, i, 0);
						char *type = PQgetvalue(result, i, 1);
						
						size_t name_len = strlen(name) + 1;
						size_t type_len = strlen(type) + 1;
						/* Add the column to the list */
						newitem = mapred_malloc(sizeof(mapred_plist_t));
						newitem->name = mapred_malloc(name_len);
						strncpy(newitem->name, name, name_len);
						newitem->type = mapred_malloc(type_len);
						strncpy(newitem->type, type, type_len);
						newitem->next = obj->u.input.columns;
						obj->u.input.columns = newitem;
					}
				}
				else
				{
					char *error = PQresultErrorField(result, PG_DIAG_SQLSTATE);
					char *name;

					if (obj->u.input.type == MAPRED_INPUT_TABLE)
						name = obj->u.input.desc;
					else
						name = obj->name;

					if (PQresultStatus(result) == PGRES_TUPLES_OK)
					{
						mapred_obj_error(obj, "Table '%s' contains no rows", name);
					}
					else if (!strcmp(error, OBJ_DOES_NOT_EXIST) ||
							 !strcmp(error, SCHEMA_DOES_NOT_EXIST) )
					{
						mapred_obj_error(obj, "Table '%s' not found", name);
					}
					else
					{
						mapred_obj_error(obj, "Table '%s' unknown error: %s", name, error);
					}
					XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
				}
				PQclear(result);
			}
			break;


		case MAPRED_OUTPUT:
			break;

		case MAPRED_MAPPER:
		case MAPRED_TRANSITION:
		case MAPRED_COMBINER:
		case MAPRED_FINALIZER:
			XASSERT(obj->u.function.parameters);
			XASSERT(obj->u.function.returns);
			break;


		case MAPRED_REDUCER:
		{
			mapred_object_t *transition = obj->u.reducer.transition.object;

			XASSERT(transition);
			XASSERT(transition->u.function.parameters);
			obj->u.reducer.parameters =
				transition->u.function.parameters->next;

			/*
			 * Use the return result of:
			 *   1) The finalizer
			 *   2) The combiner, or
			 *   3) The transition
			 *
			 * in that order, if the return is not derivable then
			 * fall into the default value of a single text column
			 * named "value"
			 */
			if (obj->u.reducer.finalizer.name)
				sub = obj->u.reducer.finalizer.object;
			else if (obj->u.reducer.combiner.name)
				sub = obj->u.reducer.combiner.object;
			else
				sub = obj->u.reducer.transition.object;

			if (sub)
				obj->u.reducer.returns = sub->u.function.returns;

			if (!obj->u.reducer.returns)
			{
				/*
				 * If unable to determine the returns based on the reducer
				 * components (generally due to use of SQL functions) then
				 * use the default of a single text column named "value".
				 */
				obj->u.reducer.returns = mapred_malloc(sizeof(mapred_plist_t));
				obj->u.reducer.returns->name = "value";
				obj->u.reducer.returns->type = "text";
				obj->u.reducer.returns->next = NULL;
			}
			break;
		}


		case MAPRED_TASK:
		case MAPRED_EXECUTION:
		{
			mapred_plist_t *scan;
			mapred_plist_t *last = NULL;

			/*
			 * The input must either be an INPUT or a TASK
			 */
			sub = obj->u.task.input.object;
			switch (sub->kind)
			{
				case MAPRED_INPUT:
					obj->u.task.parameters = sub->u.input.columns;
					break;

				case MAPRED_TASK:
					/* union the input tasks returns and grouping */
					for (scan = sub->u.task.grouping;
						 scan;
						 scan = scan->next)
					{
						if (!last)
						{
							obj->u.task.parameters =
								mapred_malloc(sizeof(mapred_plist_t));
							last = obj->u.task.parameters;
						}
						else
						{
							last->next =
								mapred_malloc(sizeof(mapred_plist_t));
							last = last->next;
						}
						last->name = scan->name;
						last->type = scan->type;
						last->next = NULL;
					}
					for (scan = sub->u.task.returns;
						 scan;
						 scan = scan->next)
					{
						if (!last)
						{
							obj->u.task.parameters =
								mapred_malloc(sizeof(mapred_plist_t));
							last = obj->u.task.parameters;
						}
						else
						{
							last->next =
								mapred_malloc(sizeof(mapred_plist_t));
							last = last->next;
						}
						last->name = scan->name;
						last->type = scan->type;
						last->next = NULL;
					}
					break;

				default:
					/* Should have already been validated */
					XASSERT(false);
			}

			if (obj->u.task.mapper.name)
			{
				sub = obj->u.task.mapper.object;
				if (!sub)
				{
					/* FIXME: Lookup function in database */
					/* for now... do nothing */
				}
				else
				{
					/* Allow any function type */
					switch (sub->kind)
					{
						case MAPRED_MAPPER:
						case MAPRED_TRANSITION:
						case MAPRED_COMBINER:
						case MAPRED_FINALIZER:
							break;

						default:
							/* Should have already been validated */
							XASSERT(false);
					}
				}
			}

			if (obj->u.task.reducer.name)
			{
				mapred_clist_t *keys;
				mapred_plist_t *source;

				/*
				 * The grouping columns for a task are the columns produced
				 * by the input/mapper that are not consumed by the reducer.
				 *
				 * A special exception is made for a column named "key" which
				 * is always a grouping column.
				 *
				 * FIXME: deal with non-yaml map functions
				 *
				 * FIXME: deal with KEY specifications
				 */
				if (obj->u.task.mapper.object)
					source = obj->u.task.mapper.object->u.function.returns;
				else
					source = obj->u.task.parameters;

				sub = obj->u.task.reducer.object;
				if (!sub)
				{
					/*
					 * The output of a built in function is defined to be
					 * "value", with an input of "value", everything else
					 * is defined to be a grouping column.
					 */
					last = NULL;
					for (scan = source; scan; scan = scan->next)
					{
						if (strcasecmp(scan->name, "value"))
						{
							if (!last)
							{
								obj->u.task.grouping =
									mapred_malloc(sizeof(mapred_plist_t));
								last = obj->u.task.grouping;
							}
							else
							{
								last->next =
									mapred_malloc(sizeof(mapred_plist_t));
								last = last->next;
							}
							last->name = scan->name;
							last->type = scan->type;
							last->next = NULL;
						}
					}
				}
				else
				{
					/* Validate Reducer */
					XASSERT(sub->kind == MAPRED_REDUCER);

					/*
					 * source is the set of input columns that the reducer has
					 * to work with.
					 *
					 * Loop the reducer "keys" clause to determine what keys are
					 * present.
					 */
					last = NULL;
					for (keys = sub->u.reducer.keys; keys; keys = keys->next)
					{
						/*
						 * If there is a '*' in the keys then it catches all
						 * unreferenced columns.
						 */
						if (keys->value[0] == '*' && keys->value[1] == '\0')
						{
							/*
							 * Add all sources not found in either parameters,
							 * or explicitly mentioned in keys
							 */
							for (scan = source; scan; scan = scan->next)
							{
								mapred_plist_t *pscan;
								mapred_clist_t *kscan;

								for (pscan = sub->u.reducer.parameters;
									 pscan;
									 pscan = pscan->next)
								{
									if (!strcasecmp(scan->name, pscan->name))
										break;
								}
								if (pscan)
									continue;   /* found in parameters */
								for (kscan = sub->u.reducer.keys;
									 kscan;
									 kscan = kscan->next)
								{
									if (!strcasecmp(scan->name, kscan->value))
										break;
								}
								if (kscan)
									continue;   /* found in keys */

								/* we have an unmatched source, add to grouping */
								if (!last)
								{
									obj->u.task.grouping =
										mapred_malloc(sizeof(mapred_plist_t));
									last = obj->u.task.grouping;
								}
								else
								{
									last->next =
										mapred_malloc(sizeof(mapred_plist_t));
									last = last->next;
								}
								last->name = scan->name;
								last->type = scan->type;
								last->next = NULL;
							}
						}
						else
						{
							/* Look for the referenced key in the source list */
							for (scan = source; scan; scan = scan->next)
								if (!strcasecmp(keys->value, scan->name))
								{
									/* we have a match, add the key to grouping */
									if (!last)
									{
										obj->u.task.grouping =
											mapred_malloc(sizeof(mapred_plist_t));
										last = obj->u.task.grouping;
									}
									else
									{
										last->next =
											mapred_malloc(sizeof(mapred_plist_t));
										last = last->next;
									}
									last->name = scan->name;
									last->type = scan->type;
									last->next = NULL;
									break;
								}
						}
					}
				}
			}

			/*
			 * If there is a reducer then the "returns" columns are the
			 * output of the reducer, and must be unioned with the grouping
			 * columns for final output.
			 *
			 * If there is no reducer then the returns columns are the
			 * returns columns of the mapper or the input
			 */
			if (obj->u.task.reducer.name)
			{
				/*
				 * If it is a built in function then we'll just fall into the
				 * default of a single text column named "value".
				 */
				sub = obj->u.task.reducer.object;
				if (sub)
					obj->u.task.returns = sub->u.reducer.returns;

			}
			else if (obj->u.task.mapper.name)
			{
				sub = obj->u.task.mapper.object;
				if (sub)
					obj->u.task.returns = sub->u.function.returns;
			}
			else
			{
				obj->u.task.returns = obj->u.task.parameters;
			}

			if (!obj->u.task.returns)
			{
				/*
				 * If unable to determine the returns based on the reducer
				 * components (generally due to use of SQL functions) then
				 * use the default of a single text column named "value".
				 */
				obj->u.task.returns = mapred_malloc(sizeof(mapred_plist_t));
				obj->u.task.returns->name = "value";
				obj->u.task.returns->type = "text";
				obj->u.task.returns->next = NULL;
			}
			break;
		}

		default:
			XASSERT(false);
	}
}