void mapred_resolve_object()

in gpcontrib/gpmapreduce/src/mapred.c [1185:1547]


void mapred_resolve_object(PGconn *conn, mapred_document_t *doc,
						   mapred_object_t *obj, int *exec_count)
{
	mapred_olist_t  *newlist;
	mapred_object_t *sub;   /* sub-object */
	size_t           len;

	switch (obj->kind)
	{
		/* Objects with no dependencies */
		case MAPRED_OUTPUT:
		case MAPRED_ADT:
			break;

		case MAPRED_INPUT:
			/*
			 * For FILE/GPFDIST/EXEC inputs we will create a name-prefixed
			 * version of the object to prevent name collisions, and then
			 * create a second temporary view over the external table to
			 * support access to the input by "name".  This involves creating
			 * a second copy of the input which we place directly after the
			 * original input in the document object list.
			 */
			if (obj->u.input.type == MAPRED_INPUT_GPFDIST ||
				obj->u.input.type == MAPRED_INPUT_FILE    ||
				obj->u.input.type == MAPRED_INPUT_EXEC)
			{
				mapred_object_t *newinput;
				mapred_olist_t  *parent;

				newinput = mapred_malloc(sizeof(mapred_object_t));
				memset(newinput, 0, sizeof(mapred_object_t));
				newinput->kind = MAPRED_INPUT;
				len = strlen(obj->name) + 1;
				newinput->name = mapred_malloc(len);
				sprintf(newinput->name, "%s", obj->name);
				newinput->u.input.type = MAPRED_INPUT_QUERY;
				len = strlen(doc->prefix) + strlen(obj->name) + 16;
				newinput->u.input.desc = mapred_malloc(len);
				snprintf(newinput->u.input.desc, len,
						 "select * from %s%s",
						 doc->prefix, obj->name);

				/*
				 * Find parent input in the doclist and add the new object
				 * immediately after it.
				 */
				for (parent = doc->objects;
					 parent && parent->object != obj;
					 parent = parent->next);
				XASSERT(parent);
				newlist = mapred_malloc(sizeof(mapred_olist_t));
				newlist->object = newinput;
				newlist->next = parent->next;
				parent->next = newlist;
			}
			break;


		case MAPRED_MAPPER:
		case MAPRED_TRANSITION:
		case MAPRED_COMBINER:
		case MAPRED_FINALIZER:


			/*
			 * If the function is an internal function then we try to resolve
			 * the function by looking it up in the catalog.
			 */
			obj->u.function.internal_returns = NULL;
			obj->internal = false;

			if (!obj->u.function.language)
			{
				obj->internal = true;
				lookup_function_in_catalog(conn, doc, obj);
			}
			/* ??? */
			else if (!obj->u.function.returns)
			{
				XASSERT(false);
			}

			/*
			 * The function types may manufacture a dependency on an adt,
			 * but have no other dependencies.
			 */
			else if (obj->u.function.returns->next)
			{
				sub = mapred_malloc(sizeof(mapred_object_t));
				memset(sub, 0, sizeof(mapred_object_t));
				sub->kind = MAPRED_ADT;
				len = strlen(doc->prefix) + strlen(obj->name) + 7;
				sub->name = mapred_malloc(len);
				snprintf(sub->name, len, "%s%s_rtype",
						 doc->prefix, obj->name);
				sub->u.adt.returns = obj->u.function.returns;

				obj->u.function.rtype.name = sub->name;
				obj->u.function.rtype.object = sub;

				/* Add the ADT to the list of document objects */
				newlist = mapred_malloc(sizeof(mapred_olist_t));
				newlist->object = sub;
				newlist->next = doc->objects;
				doc->objects = newlist;

				/* And resolve the sub-object */
				mapred_resolve_object(conn, doc, sub, exec_count);
			}
			else
			{
				obj->u.function.rtype.name = obj->u.function.returns->type;
				obj->u.function.rtype.object = NULL;
			}
			break;


		case MAPRED_REDUCER:
		{
			/*
			 * If we have a function, but no object then we assume that it is
			 * a database function.  Create a dummy object to handle this case.
			 */
			mapred_resolve_ref(doc->objects, &obj->u.reducer.transition);
			if (obj->u.reducer.transition.name &&
				!obj->u.reducer.transition.object)
			{
				len = strlen(obj->u.reducer.transition.name) + 1;
				sub = mapred_malloc(sizeof(mapred_object_t));
				memset(sub, 0, sizeof(mapred_object_t));
				sub->kind = MAPRED_TRANSITION;
				sub->name = mapred_malloc(len);
				sub->line = obj->line;
				strncpy(sub->name, obj->u.reducer.transition.name, len);

				newlist = mapred_malloc(sizeof(mapred_olist_t));
				newlist->object = sub;
				newlist->next = doc->objects;
				doc->objects = newlist;
				obj->u.reducer.transition.object = sub;

				/* And resolve the sub-object */
				mapred_resolve_object(conn, doc, sub, exec_count);
			}
			mapred_resolve_ref(doc->objects, &obj->u.reducer.combiner);
			if (obj->u.reducer.combiner.name &&
				!obj->u.reducer.combiner.object)
			{
				len = strlen(obj->u.reducer.combiner.name) + 1;
				sub = mapred_malloc(sizeof(mapred_object_t));
				memset(sub, 0, sizeof(mapred_object_t));
				sub->kind = MAPRED_COMBINER;
				sub->name = mapred_malloc(len);
				sub->line = obj->line;
				strncpy(sub->name, obj->u.reducer.combiner.name, len);

				newlist = mapred_malloc(sizeof(mapred_olist_t));
				newlist->object = sub;
				newlist->next = doc->objects;
				doc->objects = newlist;
				obj->u.reducer.combiner.object = sub;

				/* And resolve the sub-object */
				mapred_resolve_object(conn, doc, sub, exec_count);
			}
			mapred_resolve_ref(doc->objects, &obj->u.reducer.finalizer);
			if (obj->u.reducer.finalizer.name &&
				!obj->u.reducer.finalizer.object)
			{
				len = strlen(obj->u.reducer.finalizer.name) + 1;
				sub = mapred_malloc(sizeof(mapred_object_t));
				memset(sub, 0, sizeof(mapred_object_t));
				sub->kind = MAPRED_FINALIZER;
				sub->name = mapred_malloc(len);
				sub->line = obj->line;
				strncpy(sub->name, obj->u.reducer.finalizer.name, len);

				newlist = mapred_malloc(sizeof(mapred_olist_t));
				newlist->object = sub;
				newlist->next = doc->objects;
				doc->objects = newlist;
				obj->u.reducer.finalizer.object = sub;

				/* And resolve the sub-object */
				mapred_resolve_object(conn, doc, sub, exec_count);
			}

			break;
		}


		case MAPRED_TASK:
		case MAPRED_EXECUTION:
		{
			/*
			 * Resolving a task may require recursion to resolve other
			 * tasks to work out parameter lists.  We keep track of
			 * our resolution state in order to detect potential
			 * infinite recursion issues.
			 */
			if (obj->u.task.flags & mapred_task_resolved)
				return;

			/* Assign a name to anonymous executions */
			if (!obj->name)
			{
				size_t     len;

				XASSERT(obj->u.task.execute);

				/* 10 characters for max int digits, 4 for "run_" */
				len = strlen(doc->prefix) + 16;
				obj->name = mapred_malloc(len);
				snprintf(obj->name, len, "%srun_%d",
						 doc->prefix, ++(*exec_count));
			}

			/* Check for infinite recursion */
			if (obj->u.task.flags & mapred_task_resolving)
			{
				mapred_obj_error(obj, "Infinite recursion detected while "
								 "trying to resove TASK");
				XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
			}
			obj->u.task.flags |= mapred_task_resolving;

			/* Validate object types */
			if (obj->u.task.input.name)
			{
				mapred_resolve_ref(doc->objects, &obj->u.task.input);
				sub = obj->u.task.input.object;

				/* If we can't find the input, throw an error */
				if (!sub)
				{
					/* Can't find INPUT object */
					mapred_obj_error(obj, "SOURCE '%s' not found in document",
									 obj->u.task.input.name);
					XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
				}

				/*
				 * The input must either be an INPUT or a TASK
				 */
				switch (sub->kind)
				{
					case MAPRED_INPUT:
						break;

					case MAPRED_TASK:
						/* This objects input is the sub objects output */
						mapred_resolve_object(conn, doc, sub, exec_count);
						break;

						/* Otherwise generate an error */
					default:

						/* SOURCE wasn't an INPUT */
						mapred_obj_error(obj, "SOURCE '%s' is neither an INPUT nor a TASK",
										 obj->u.task.input.name);
						XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
				}
			}

			if (obj->u.task.mapper.name)
			{
				mapred_resolve_ref(doc->objects, &obj->u.task.mapper);
				sub = obj->u.task.mapper.object;

				if (!sub)
				{
					/* Create an internal map function */
					len = strlen(obj->u.task.mapper.name) + 1;
					sub = mapred_malloc(sizeof(mapred_object_t));
					memset(sub, 0, sizeof(mapred_object_t));
					sub->kind = MAPRED_MAPPER;
					sub->name = mapred_malloc(len);
					sub->line = obj->line;
					strncpy(sub->name, obj->u.task.mapper.name, len);

					newlist = mapred_malloc(sizeof(mapred_olist_t));
					newlist->object = sub;
					newlist->next = doc->objects;
					doc->objects = newlist;
					obj->u.task.mapper.object = sub;

					/* And resolve the sub-object */
					mapred_resolve_object(conn, doc, sub, exec_count);
				}
				else
				{
					/* Allow any function type */
					switch (sub->kind)
					{
						case MAPRED_MAPPER:
						case MAPRED_TRANSITION:
						case MAPRED_COMBINER:
						case MAPRED_FINALIZER:
							break;

						default:
							mapred_obj_error(obj, "MAP '%s' is not a MAP object",
											 obj->u.task.mapper.name);
							XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
					}
				}
			}

			if (obj->u.task.reducer.name)
			{
				mapred_resolve_ref(doc->objects, &obj->u.task.reducer);
				sub = obj->u.task.reducer.object;

				if (!sub)
				{
					/* FIXME: non-yaml reducers */
				}
				else if (sub->kind == MAPRED_REDUCER)
				{   /* Validate Reducer */
					mapred_resolve_object(conn, doc, sub, exec_count);
				}
				else
				{   /* It's an object, but not a REDUCER */
					mapred_obj_error(obj, "REDUCE '%s' is not a REDUCE object",
									 obj->u.task.reducer.name);
					XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
				}
			}

			if (obj->u.task.output.name)
			{
				mapred_resolve_ref(doc->objects, &obj->u.task.output);

				sub = obj->u.task.output.object;
				if (sub && sub->kind != MAPRED_OUTPUT)
				{
					mapred_obj_error(obj, "TARGET '%s' is not an OUTPUT object",
									 obj->u.task.output.name);
					XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
				}
				if (!sub && obj->u.task.output.name)
				{
					mapred_obj_error(obj, "TARGET '%s' is not defined in "
									 "document",
									 obj->u.task.output.name);
					XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
				}
			}

			/* clear resolving bit and set resolved bit */
			obj->u.task.flags &= !mapred_task_resolving;
			obj->u.task.flags |= mapred_task_resolved;
			break;
		}

		default:
			XASSERT(false);
	}

	if (global_debug_flag)
		mapred_obj_debug(obj);
}