in gpcontrib/gpmapreduce/src/mapred.c [1185:1547]
void mapred_resolve_object(PGconn *conn, mapred_document_t *doc,
mapred_object_t *obj, int *exec_count)
{
mapred_olist_t *newlist;
mapred_object_t *sub; /* sub-object */
size_t len;
switch (obj->kind)
{
/* Objects with no dependencies */
case MAPRED_OUTPUT:
case MAPRED_ADT:
break;
case MAPRED_INPUT:
/*
* For FILE/GPFDIST/EXEC inputs we will create a name-prefixed
* version of the object to prevent name collisions, and then
* create a second temporary view over the external table to
* support access to the input by "name". This involves creating
* a second copy of the input which we place directly after the
* original input in the document object list.
*/
if (obj->u.input.type == MAPRED_INPUT_GPFDIST ||
obj->u.input.type == MAPRED_INPUT_FILE ||
obj->u.input.type == MAPRED_INPUT_EXEC)
{
mapred_object_t *newinput;
mapred_olist_t *parent;
newinput = mapred_malloc(sizeof(mapred_object_t));
memset(newinput, 0, sizeof(mapred_object_t));
newinput->kind = MAPRED_INPUT;
len = strlen(obj->name) + 1;
newinput->name = mapred_malloc(len);
sprintf(newinput->name, "%s", obj->name);
newinput->u.input.type = MAPRED_INPUT_QUERY;
len = strlen(doc->prefix) + strlen(obj->name) + 16;
newinput->u.input.desc = mapred_malloc(len);
snprintf(newinput->u.input.desc, len,
"select * from %s%s",
doc->prefix, obj->name);
/*
* Find parent input in the doclist and add the new object
* immediately after it.
*/
for (parent = doc->objects;
parent && parent->object != obj;
parent = parent->next);
XASSERT(parent);
newlist = mapred_malloc(sizeof(mapred_olist_t));
newlist->object = newinput;
newlist->next = parent->next;
parent->next = newlist;
}
break;
case MAPRED_MAPPER:
case MAPRED_TRANSITION:
case MAPRED_COMBINER:
case MAPRED_FINALIZER:
/*
* If the function is an internal function then we try to resolve
* the function by looking it up in the catalog.
*/
obj->u.function.internal_returns = NULL;
obj->internal = false;
if (!obj->u.function.language)
{
obj->internal = true;
lookup_function_in_catalog(conn, doc, obj);
}
/* ??? */
else if (!obj->u.function.returns)
{
XASSERT(false);
}
/*
* The function types may manufacture a dependency on an adt,
* but have no other dependencies.
*/
else if (obj->u.function.returns->next)
{
sub = mapred_malloc(sizeof(mapred_object_t));
memset(sub, 0, sizeof(mapred_object_t));
sub->kind = MAPRED_ADT;
len = strlen(doc->prefix) + strlen(obj->name) + 7;
sub->name = mapred_malloc(len);
snprintf(sub->name, len, "%s%s_rtype",
doc->prefix, obj->name);
sub->u.adt.returns = obj->u.function.returns;
obj->u.function.rtype.name = sub->name;
obj->u.function.rtype.object = sub;
/* Add the ADT to the list of document objects */
newlist = mapred_malloc(sizeof(mapred_olist_t));
newlist->object = sub;
newlist->next = doc->objects;
doc->objects = newlist;
/* And resolve the sub-object */
mapred_resolve_object(conn, doc, sub, exec_count);
}
else
{
obj->u.function.rtype.name = obj->u.function.returns->type;
obj->u.function.rtype.object = NULL;
}
break;
case MAPRED_REDUCER:
{
/*
* If we have a function, but no object then we assume that it is
* a database function. Create a dummy object to handle this case.
*/
mapred_resolve_ref(doc->objects, &obj->u.reducer.transition);
if (obj->u.reducer.transition.name &&
!obj->u.reducer.transition.object)
{
len = strlen(obj->u.reducer.transition.name) + 1;
sub = mapred_malloc(sizeof(mapred_object_t));
memset(sub, 0, sizeof(mapred_object_t));
sub->kind = MAPRED_TRANSITION;
sub->name = mapred_malloc(len);
sub->line = obj->line;
strncpy(sub->name, obj->u.reducer.transition.name, len);
newlist = mapred_malloc(sizeof(mapred_olist_t));
newlist->object = sub;
newlist->next = doc->objects;
doc->objects = newlist;
obj->u.reducer.transition.object = sub;
/* And resolve the sub-object */
mapred_resolve_object(conn, doc, sub, exec_count);
}
mapred_resolve_ref(doc->objects, &obj->u.reducer.combiner);
if (obj->u.reducer.combiner.name &&
!obj->u.reducer.combiner.object)
{
len = strlen(obj->u.reducer.combiner.name) + 1;
sub = mapred_malloc(sizeof(mapred_object_t));
memset(sub, 0, sizeof(mapred_object_t));
sub->kind = MAPRED_COMBINER;
sub->name = mapred_malloc(len);
sub->line = obj->line;
strncpy(sub->name, obj->u.reducer.combiner.name, len);
newlist = mapred_malloc(sizeof(mapred_olist_t));
newlist->object = sub;
newlist->next = doc->objects;
doc->objects = newlist;
obj->u.reducer.combiner.object = sub;
/* And resolve the sub-object */
mapred_resolve_object(conn, doc, sub, exec_count);
}
mapred_resolve_ref(doc->objects, &obj->u.reducer.finalizer);
if (obj->u.reducer.finalizer.name &&
!obj->u.reducer.finalizer.object)
{
len = strlen(obj->u.reducer.finalizer.name) + 1;
sub = mapred_malloc(sizeof(mapred_object_t));
memset(sub, 0, sizeof(mapred_object_t));
sub->kind = MAPRED_FINALIZER;
sub->name = mapred_malloc(len);
sub->line = obj->line;
strncpy(sub->name, obj->u.reducer.finalizer.name, len);
newlist = mapred_malloc(sizeof(mapred_olist_t));
newlist->object = sub;
newlist->next = doc->objects;
doc->objects = newlist;
obj->u.reducer.finalizer.object = sub;
/* And resolve the sub-object */
mapred_resolve_object(conn, doc, sub, exec_count);
}
break;
}
case MAPRED_TASK:
case MAPRED_EXECUTION:
{
/*
* Resolving a task may require recursion to resolve other
* tasks to work out parameter lists. We keep track of
* our resolution state in order to detect potential
* infinite recursion issues.
*/
if (obj->u.task.flags & mapred_task_resolved)
return;
/* Assign a name to anonymous executions */
if (!obj->name)
{
size_t len;
XASSERT(obj->u.task.execute);
/* 10 characters for max int digits, 4 for "run_" */
len = strlen(doc->prefix) + 16;
obj->name = mapred_malloc(len);
snprintf(obj->name, len, "%srun_%d",
doc->prefix, ++(*exec_count));
}
/* Check for infinite recursion */
if (obj->u.task.flags & mapred_task_resolving)
{
mapred_obj_error(obj, "Infinite recursion detected while "
"trying to resove TASK");
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
obj->u.task.flags |= mapred_task_resolving;
/* Validate object types */
if (obj->u.task.input.name)
{
mapred_resolve_ref(doc->objects, &obj->u.task.input);
sub = obj->u.task.input.object;
/* If we can't find the input, throw an error */
if (!sub)
{
/* Can't find INPUT object */
mapred_obj_error(obj, "SOURCE '%s' not found in document",
obj->u.task.input.name);
XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
}
/*
* The input must either be an INPUT or a TASK
*/
switch (sub->kind)
{
case MAPRED_INPUT:
break;
case MAPRED_TASK:
/* This objects input is the sub objects output */
mapred_resolve_object(conn, doc, sub, exec_count);
break;
/* Otherwise generate an error */
default:
/* SOURCE wasn't an INPUT */
mapred_obj_error(obj, "SOURCE '%s' is neither an INPUT nor a TASK",
obj->u.task.input.name);
XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
}
}
if (obj->u.task.mapper.name)
{
mapred_resolve_ref(doc->objects, &obj->u.task.mapper);
sub = obj->u.task.mapper.object;
if (!sub)
{
/* Create an internal map function */
len = strlen(obj->u.task.mapper.name) + 1;
sub = mapred_malloc(sizeof(mapred_object_t));
memset(sub, 0, sizeof(mapred_object_t));
sub->kind = MAPRED_MAPPER;
sub->name = mapred_malloc(len);
sub->line = obj->line;
strncpy(sub->name, obj->u.task.mapper.name, len);
newlist = mapred_malloc(sizeof(mapred_olist_t));
newlist->object = sub;
newlist->next = doc->objects;
doc->objects = newlist;
obj->u.task.mapper.object = sub;
/* And resolve the sub-object */
mapred_resolve_object(conn, doc, sub, exec_count);
}
else
{
/* Allow any function type */
switch (sub->kind)
{
case MAPRED_MAPPER:
case MAPRED_TRANSITION:
case MAPRED_COMBINER:
case MAPRED_FINALIZER:
break;
default:
mapred_obj_error(obj, "MAP '%s' is not a MAP object",
obj->u.task.mapper.name);
XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
}
}
}
if (obj->u.task.reducer.name)
{
mapred_resolve_ref(doc->objects, &obj->u.task.reducer);
sub = obj->u.task.reducer.object;
if (!sub)
{
/* FIXME: non-yaml reducers */
}
else if (sub->kind == MAPRED_REDUCER)
{ /* Validate Reducer */
mapred_resolve_object(conn, doc, sub, exec_count);
}
else
{ /* It's an object, but not a REDUCER */
mapred_obj_error(obj, "REDUCE '%s' is not a REDUCE object",
obj->u.task.reducer.name);
XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
}
}
if (obj->u.task.output.name)
{
mapred_resolve_ref(doc->objects, &obj->u.task.output);
sub = obj->u.task.output.object;
if (sub && sub->kind != MAPRED_OUTPUT)
{
mapred_obj_error(obj, "TARGET '%s' is not an OUTPUT object",
obj->u.task.output.name);
XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
}
if (!sub && obj->u.task.output.name)
{
mapred_obj_error(obj, "TARGET '%s' is not defined in "
"document",
obj->u.task.output.name);
XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure");
}
}
/* clear resolving bit and set resolved bit */
obj->u.task.flags &= !mapred_task_resolving;
obj->u.task.flags |= mapred_task_resolved;
break;
}
default:
XASSERT(false);
}
if (global_debug_flag)
mapred_obj_debug(obj);
}