in gpcontrib/gpmapreduce/src/mapred.c [1563:2014]
void mapred_setup_columns(PGconn *conn, mapred_object_t *obj)
{
mapred_object_t *sub;
PGresult *result;
/* switch based on object type */
switch (obj->kind)
{
case MAPRED_ADT:
break;
case MAPRED_INPUT:
/*
* Should be called after creation, otherwise catalog queries
* could fail.
*/
XASSERT(obj->created);
/* setup the column list for database defined inputs */
if (obj->u.input.type == MAPRED_INPUT_TABLE ||
obj->u.input.type == MAPRED_INPUT_QUERY)
{
/*
* This gets the ordered list of columns for the first
* input of the given name in the user's search path.
*/
buffer_t *buffer = makebuffer(1024, 1024);
bufcat(&buffer,
"SELECT attname, "
" pg_catalog.format_type(atttypid, atttypmod)\n"
"FROM pg_catalog.pg_attribute\n"
"WHERE attnum > 0 AND attrelid = lower('");
if (obj->u.input.type == MAPRED_INPUT_TABLE)
bufcat(&buffer, obj->u.input.desc);
else
bufcat(&buffer, obj->name);
bufcat(&buffer,
"')::regclass\n"
"ORDER BY -attnum;\n\n");
if (global_debug_flag)
printf("%s", buffer->buffer);
result = PQexec(conn, buffer->buffer);
mapred_free(buffer);
if (PQresultStatus(result) == PGRES_TUPLES_OK &&
PQntuples(result) > 0)
{
mapred_plist_t *newitem;
int i;
/* Destroy any previous default values we setup */
mapred_destroy_plist(&obj->u.input.columns);
/*
* The columns were sorted reverse order above so
* the list can be generated back -> front
*/
for (i = 0; i < PQntuples(result); i++)
{
char *name = PQgetvalue(result, i, 0);
char *type = PQgetvalue(result, i, 1);
size_t name_len = strlen(name) + 1;
size_t type_len = strlen(type) + 1;
/* Add the column to the list */
newitem = mapred_malloc(sizeof(mapred_plist_t));
newitem->name = mapred_malloc(name_len);
strncpy(newitem->name, name, name_len);
newitem->type = mapred_malloc(type_len);
strncpy(newitem->type, type, type_len);
newitem->next = obj->u.input.columns;
obj->u.input.columns = newitem;
}
}
else
{
char *error = PQresultErrorField(result, PG_DIAG_SQLSTATE);
char *name;
if (obj->u.input.type == MAPRED_INPUT_TABLE)
name = obj->u.input.desc;
else
name = obj->name;
if (PQresultStatus(result) == PGRES_TUPLES_OK)
{
mapred_obj_error(obj, "Table '%s' contains no rows", name);
}
else if (!strcmp(error, OBJ_DOES_NOT_EXIST) ||
!strcmp(error, SCHEMA_DOES_NOT_EXIST) )
{
mapred_obj_error(obj, "Table '%s' not found", name);
}
else
{
mapred_obj_error(obj, "Table '%s' unknown error: %s", name, error);
}
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
PQclear(result);
}
break;
case MAPRED_OUTPUT:
break;
case MAPRED_MAPPER:
case MAPRED_TRANSITION:
case MAPRED_COMBINER:
case MAPRED_FINALIZER:
XASSERT(obj->u.function.parameters);
XASSERT(obj->u.function.returns);
break;
case MAPRED_REDUCER:
{
mapred_object_t *transition = obj->u.reducer.transition.object;
XASSERT(transition);
XASSERT(transition->u.function.parameters);
obj->u.reducer.parameters =
transition->u.function.parameters->next;
/*
* Use the return result of:
* 1) The finalizer
* 2) The combiner, or
* 3) The transition
*
* in that order, if the return is not derivable then
* fall into the default value of a single text column
* named "value"
*/
if (obj->u.reducer.finalizer.name)
sub = obj->u.reducer.finalizer.object;
else if (obj->u.reducer.combiner.name)
sub = obj->u.reducer.combiner.object;
else
sub = obj->u.reducer.transition.object;
if (sub)
obj->u.reducer.returns = sub->u.function.returns;
if (!obj->u.reducer.returns)
{
/*
* If unable to determine the returns based on the reducer
* components (generally due to use of SQL functions) then
* use the default of a single text column named "value".
*/
obj->u.reducer.returns = mapred_malloc(sizeof(mapred_plist_t));
obj->u.reducer.returns->name = "value";
obj->u.reducer.returns->type = "text";
obj->u.reducer.returns->next = NULL;
}
break;
}
case MAPRED_TASK:
case MAPRED_EXECUTION:
{
mapred_plist_t *scan;
mapred_plist_t *last = NULL;
/*
* The input must either be an INPUT or a TASK
*/
sub = obj->u.task.input.object;
switch (sub->kind)
{
case MAPRED_INPUT:
obj->u.task.parameters = sub->u.input.columns;
break;
case MAPRED_TASK:
/* union the input tasks returns and grouping */
for (scan = sub->u.task.grouping;
scan;
scan = scan->next)
{
if (!last)
{
obj->u.task.parameters =
mapred_malloc(sizeof(mapred_plist_t));
last = obj->u.task.parameters;
}
else
{
last->next =
mapred_malloc(sizeof(mapred_plist_t));
last = last->next;
}
last->name = scan->name;
last->type = scan->type;
last->next = NULL;
}
for (scan = sub->u.task.returns;
scan;
scan = scan->next)
{
if (!last)
{
obj->u.task.parameters =
mapred_malloc(sizeof(mapred_plist_t));
last = obj->u.task.parameters;
}
else
{
last->next =
mapred_malloc(sizeof(mapred_plist_t));
last = last->next;
}
last->name = scan->name;
last->type = scan->type;
last->next = NULL;
}
break;
default:
/* Should have already been validated */
XASSERT(false);
}
if (obj->u.task.mapper.name)
{
sub = obj->u.task.mapper.object;
if (!sub)
{
/* FIXME: Lookup function in database */
/* for now... do nothing */
}
else
{
/* Allow any function type */
switch (sub->kind)
{
case MAPRED_MAPPER:
case MAPRED_TRANSITION:
case MAPRED_COMBINER:
case MAPRED_FINALIZER:
break;
default:
/* Should have already been validated */
XASSERT(false);
}
}
}
if (obj->u.task.reducer.name)
{
mapred_clist_t *keys;
mapred_plist_t *source;
/*
* The grouping columns for a task are the columns produced
* by the input/mapper that are not consumed by the reducer.
*
* A special exception is made for a column named "key" which
* is always a grouping column.
*
* FIXME: deal with non-yaml map functions
*
* FIXME: deal with KEY specifications
*/
if (obj->u.task.mapper.object)
source = obj->u.task.mapper.object->u.function.returns;
else
source = obj->u.task.parameters;
sub = obj->u.task.reducer.object;
if (!sub)
{
/*
* The output of a built in function is defined to be
* "value", with an input of "value", everything else
* is defined to be a grouping column.
*/
last = NULL;
for (scan = source; scan; scan = scan->next)
{
if (strcasecmp(scan->name, "value"))
{
if (!last)
{
obj->u.task.grouping =
mapred_malloc(sizeof(mapred_plist_t));
last = obj->u.task.grouping;
}
else
{
last->next =
mapred_malloc(sizeof(mapred_plist_t));
last = last->next;
}
last->name = scan->name;
last->type = scan->type;
last->next = NULL;
}
}
}
else
{
/* Validate Reducer */
XASSERT(sub->kind == MAPRED_REDUCER);
/*
* source is the set of input columns that the reducer has
* to work with.
*
* Loop the reducer "keys" clause to determine what keys are
* present.
*/
last = NULL;
for (keys = sub->u.reducer.keys; keys; keys = keys->next)
{
/*
* If there is a '*' in the keys then it catches all
* unreferenced columns.
*/
if (keys->value[0] == '*' && keys->value[1] == '\0')
{
/*
* Add all sources not found in either parameters,
* or explicitly mentioned in keys
*/
for (scan = source; scan; scan = scan->next)
{
mapred_plist_t *pscan;
mapred_clist_t *kscan;
for (pscan = sub->u.reducer.parameters;
pscan;
pscan = pscan->next)
{
if (!strcasecmp(scan->name, pscan->name))
break;
}
if (pscan)
continue; /* found in parameters */
for (kscan = sub->u.reducer.keys;
kscan;
kscan = kscan->next)
{
if (!strcasecmp(scan->name, kscan->value))
break;
}
if (kscan)
continue; /* found in keys */
/* we have an unmatched source, add to grouping */
if (!last)
{
obj->u.task.grouping =
mapred_malloc(sizeof(mapred_plist_t));
last = obj->u.task.grouping;
}
else
{
last->next =
mapred_malloc(sizeof(mapred_plist_t));
last = last->next;
}
last->name = scan->name;
last->type = scan->type;
last->next = NULL;
}
}
else
{
/* Look for the referenced key in the source list */
for (scan = source; scan; scan = scan->next)
if (!strcasecmp(keys->value, scan->name))
{
/* we have a match, add the key to grouping */
if (!last)
{
obj->u.task.grouping =
mapred_malloc(sizeof(mapred_plist_t));
last = obj->u.task.grouping;
}
else
{
last->next =
mapred_malloc(sizeof(mapred_plist_t));
last = last->next;
}
last->name = scan->name;
last->type = scan->type;
last->next = NULL;
break;
}
}
}
}
}
/*
* If there is a reducer then the "returns" columns are the
* output of the reducer, and must be unioned with the grouping
* columns for final output.
*
* If there is no reducer then the returns columns are the
* returns columns of the mapper or the input
*/
if (obj->u.task.reducer.name)
{
/*
* If it is a built in function then we'll just fall into the
* default of a single text column named "value".
*/
sub = obj->u.task.reducer.object;
if (sub)
obj->u.task.returns = sub->u.reducer.returns;
}
else if (obj->u.task.mapper.name)
{
sub = obj->u.task.mapper.object;
if (sub)
obj->u.task.returns = sub->u.function.returns;
}
else
{
obj->u.task.returns = obj->u.task.parameters;
}
if (!obj->u.task.returns)
{
/*
* If unable to determine the returns based on the reducer
* components (generally due to use of SQL functions) then
* use the default of a single text column named "value".
*/
obj->u.task.returns = mapred_malloc(sizeof(mapred_plist_t));
obj->u.task.returns->name = "value";
obj->u.task.returns->type = "text";
obj->u.task.returns->next = NULL;
}
break;
}
default:
XASSERT(false);
}
}