in gpcontrib/gpmapreduce/src/mapred.c [2505:3806]
boolean mapred_create_object(PGconn *conn, mapred_document_t *doc,
mapred_object_t *obj)
{
mapred_clist_t *clist = NULL;
mapred_plist_t *plist = NULL;
mapred_plist_t *plist2 = NULL;
const char *ckind = NULL;
buffer_t *buffer = NULL;
buffer_t *qbuffer = NULL;
PGresult *result = NULL;
/* If the object was created in a prior pass, then do nothing */
if (obj->created)
return true;
/* Otherwise attempt to create the object */
XTRY
{
/* allocates 1024 bytes, extending by 1024 bytes if we run out */
buffer = makebuffer(1024, 1024);
switch (obj->kind)
{
case MAPRED_INPUT:
XASSERT(obj->name);
switch (obj->u.input.type)
{
case MAPRED_INPUT_TABLE:
/* Nothing to actually create */
obj->created = true;
break;
case MAPRED_INPUT_FILE:
case MAPRED_INPUT_GPFDIST:
XASSERT(obj->u.input.files);
/* Allocate and produce buffer */
bufcat(&buffer, "CREATE EXTERNAL TABLE ");
bufcat(&buffer, doc->prefix);
bufcat(&buffer, obj->name);
bufcat(&buffer, "(");
for (plist = obj->u.input.columns;
plist;
plist = plist->next)
{
bufcat(&buffer, plist->name);
bufcat(&buffer, " ");
bufcat(&buffer, plist->type);
if (plist->next)
bufcat(&buffer, ", ");
}
bufcat(&buffer, ")\n");
bufcat(&buffer, " LOCATION(");
for (clist = obj->u.input.files;
clist;
clist = clist->next)
{
char *domain_port, *path, *p = NULL;
if (obj->u.input.type == MAPRED_INPUT_GPFDIST)
bufcat(&buffer, "'gpfdist://");
else
bufcat(&buffer, "'file://");
/*
* The general syntax of a URL is scheme://domain:port/path?query_string#fragment_id
* clist->value should contain just domain:port/path?query_string#fragment_id
*/
p = strchr(clist->value, '/');
if (p == NULL)
{
mapred_obj_error(obj, "Failed to find '/' indicating start of path (%s)",
clist->value);
XRAISE(MAPRED_PARSE_ERROR,
"Invalid INPUT source specification");
}
if (p == clist->value)
{
mapred_obj_error(obj, "Missing domain and port before '/' indicating start of path (%s)",
clist->value);
XRAISE(MAPRED_PARSE_ERROR,
"Invalid INPUT source specification");
}
domain_port = clist->value;
path = p+1;
/*
* Overwrite the / separating the domain:port from the path
* with a nul and move back one byte to check for a trailing ':'.
* We put the / back in when copying into the destination buffer.
*/
*p-- = '\0';
if (strlen(path) < 1)
{
mapred_obj_error(obj, "Missing path after '/' (%s)",
clist->value);
XRAISE(MAPRED_PARSE_ERROR,
"Invalid INPUT source specification");
}
/*
* We allow a trailing ':' (e.g. host:/filepath)
* but we must not copy it into the external table url.
*/
if (*p == ':')
*p = '\0';
if (strlen(domain_port) < 1)
{
mapred_obj_error(obj, "Missing host before '/' (%s)",
clist->value);
XRAISE(MAPRED_PARSE_ERROR,
"Invalid INPUT source specification");
}
bufcat(&buffer, domain_port);
bufcat(&buffer, "/");
bufcat(&buffer, path);
if (clist->next)
bufcat(&buffer, "',\n ");
}
bufcat(&buffer, "')\n");
if (obj->u.input.format == MAPRED_FORMAT_CSV)
bufcat(&buffer, " FORMAT 'CSV'");
else
bufcat(&buffer, " FORMAT 'TEXT'");
if (obj->u.input.delimiter ||
obj->u.input.escape ||
obj->u.input.quote ||
obj->u.input.null)
{
bufcat(&buffer, " ( ");
if (obj->u.input.delimiter)
{
bufcat(&buffer, "DELIMITER '");
bufcat(&buffer, obj->u.input.delimiter);
bufcat(&buffer, "' ");
}
if (obj->u.input.escape)
{
bufcat(&buffer, "ESCAPE '");
bufcat(&buffer, obj->u.input.escape);
bufcat(&buffer, "' ");
}
if (obj->u.input.quote)
{
bufcat(&buffer, "QUOTE '");
bufcat(&buffer, obj->u.input.quote);
bufcat(&buffer, "' ");
}
if (obj->u.input.null)
{
bufcat(&buffer, "NULL '");
bufcat(&buffer, obj->u.input.null);
bufcat(&buffer, "' ");
}
bufcat(&buffer, ")");
}
if (obj->u.input.error_limit > 0)
{
char intbuf[11];
snprintf(intbuf, 11, "%d",
obj->u.input.error_limit);
bufcat(&buffer, "\n SEGMENT REJECT LIMIT ");
bufcat(&buffer, intbuf);
}
bufcat(&buffer, ";\n\n");
break;
case MAPRED_INPUT_EXEC:
XASSERT(obj->u.input.desc);
bufcat(&buffer, "CREATE EXTERNAL WEB TABLE ");
bufcat(&buffer, doc->prefix);
bufcat(&buffer, obj->name);
bufcat(&buffer, "(");
for (plist = obj->u.input.columns;
plist;
plist = plist->next)
{
bufcat(&buffer, plist->name);
bufcat(&buffer, " ");
bufcat(&buffer, plist->type);
if (plist->next)
bufcat(&buffer, ", ");
}
bufcat(&buffer, ")\n");
bufcat(&buffer, "EXECUTE '");
bufcat(&buffer, obj->u.input.desc);
bufcat(&buffer, "'\n");
if (obj->u.input.format == MAPRED_FORMAT_CSV)
bufcat(&buffer, " FORMAT 'CSV'");
else
bufcat(&buffer, " FORMAT 'TEXT'");
if (obj->u.input.delimiter ||
obj->u.input.quote ||
obj->u.input.null)
{
bufcat(&buffer, " ( ");
if (obj->u.input.delimiter)
{
bufcat(&buffer, "DELIMITER '");
bufcat(&buffer, obj->u.input.delimiter);
bufcat(&buffer, "' ");
}
if (obj->u.input.quote)
{
bufcat(&buffer, "QUOTE '");
bufcat(&buffer, obj->u.input.quote);
bufcat(&buffer, "' ");
}
if (obj->u.input.null)
{
bufcat(&buffer, "NULL '");
bufcat(&buffer, obj->u.input.null);
bufcat(&buffer, "' ");
}
bufcat(&buffer, ")");
}
if (obj->u.input.error_limit > 0)
{
char intbuf[11];
snprintf(intbuf, 11, "%d",
obj->u.input.error_limit);
bufcat(&buffer, "\n SEGMENT REJECT LIMIT ");
bufcat(&buffer, intbuf);
}
bufcat(&buffer, ";\n\n");
break;
case MAPRED_INPUT_QUERY:
XASSERT(obj->u.input.desc);
/*
* CREATE TEMPORARY VIEW <name> AS
* <desc>;
*/
bufcat(&buffer, "CREATE TEMPORARY VIEW ");
bufcat(&buffer, obj->name);
bufcat(&buffer, " AS\n");
bufcat(&buffer, obj->u.input.desc);
bufcat(&buffer, ";\n\n");
break;
case MAPRED_INPUT_NONE:
default:
XASSERT(false);
}
if (global_print_flag || global_debug_flag)
printf("-- INPUT %s\n", obj->name);
break;
case MAPRED_OUTPUT:
/*
* Outputs have no backend objects created directly.
* For output tables we may issue a create table as
* select, but that occurs at run-time.
*/
obj->created = true;
mapred_setup_columns(conn, obj);
break;
/*
* The function types have different defaults and generate
* slightly different error messages, but basically do the
* same thing.
*/
case MAPRED_MAPPER:
case MAPRED_TRANSITION:
case MAPRED_COMBINER:
case MAPRED_FINALIZER:
ckind = mapred_kind_name[obj->kind];
XASSERT(obj->name);
/*
* 'kind' specific initialization accomplished above, now handle
* the generic function creation.
*/
if (global_print_flag || global_debug_flag)
printf("-- %s %s\n", ckind, obj->name);
/*
* Nothing to do if we already looked up the function in the
* catalog.
*/
if (obj->internal)
{
obj->created = true;
break;
}
/* Non-internal functions should have these defined */
XASSERT(obj->u.function.body);
XASSERT(obj->u.function.language);
mapred_setup_columns(conn, obj);
XASSERT(obj->u.function.parameters);
XASSERT(obj->u.function.rtype.name);
XASSERT(NULL == obj->u.function.internal_returns);
/*
* fill in the buffer:
*
* CREATE FUNCTION <name>(<parameters>)
* RETURNS [SETOF] <rtype> LANGUAGE <lang> AS
* $$
* <body>
* $$ [STRICT] [IMMUTABLE];
*
*/
bufcat(&buffer, "CREATE FUNCTION ");
bufcat(&buffer, doc->prefix);
bufcat(&buffer, obj->name);
bufcat(&buffer, "(");
/* Handle parameter list */
for (plist = obj->u.function.parameters;
plist;
plist = plist->next)
{
bufcat(&buffer, plist->name);
bufcat(&buffer, " ");
bufcat(&buffer, plist->type);
if (plist->next)
bufcat(&buffer, ", ");
}
/* Handle Return clause */
bufcat(&buffer, ")\nRETURNS ");
if (obj->u.function.mode == MAPRED_MODE_MULTI)
bufcat(&buffer, "SETOF ");
bufcat(&buffer, obj->u.function.rtype.name);
/*
* Handle LANGUAGE clause, every langauge but 'C' and 'SQL'
* has 'pl' prefixing it
*/
if (!strcasecmp("C", obj->u.function.language) ||
!strcasecmp("SQL", obj->u.function.language) ||
!strncasecmp("PL", obj->u.function.language, 2))
{
bufcat(&buffer, " LANGUAGE ");
bufcat(&buffer, obj->u.function.language);
}
else
{
bufcat(&buffer, " LANGUAGE pl");
bufcat(&buffer, obj->u.function.language);
}
/* python only has an untrusted form */
if (!strcasecmp("python", obj->u.function.language))
bufcat(&buffer, "u");
bufcat(&buffer, " AS ");
/*
* Handle procedural language specific formatting for the
* function definition.
*
* C language functions are defined using the two parameter
* form: AS "library", "function".
*
* Perl functions append the yaml file line number via a
* #line declaration.
*
* Python functions try to append the yaml file line number
* by inserting a bunch of newlines. (only works for runtime
* errors, not compiletime errors).
*/
if (!strcasecmp("C", obj->u.function.language))
{
bufcat(&buffer, "$$");
bufcat(&buffer, obj->u.function.library);
bufcat(&buffer, "$$, $$");
bufcat(&buffer, obj->u.function.body);
bufcat(&buffer, "$$");
}
else if (!strncasecmp("plperl", obj->u.function.language, 6) ||
!strncasecmp("perl", obj->u.function.language, 4))
{
char lineno[10];
snprintf(lineno, sizeof(lineno), "%d", obj->u.function.lineno);
bufcat(&buffer, "$$\n#line ");
bufcat(&buffer, lineno);
bufcat(&buffer, "\n");
bufcat(&buffer, obj->u.function.body);
if (buffer->buffer[buffer->position-1] != '\n')
bufcat(&buffer, "\n");
bufcat(&buffer, "$$");
}
else if (!strncasecmp("plpython", obj->u.function.language, 8) ||
!strncasecmp("python", obj->u.function.language, 6))
{
/*
* Python very stubborn about not letting you manually
* adjust line number. So instead we take the stupid route
* and just insert N newlines.
*/
int i;
bufcat(&buffer, "$$\n");
for (i = 1; i < obj->u.function.lineno-2; i++)
bufcat(&buffer, "\n");
bufcat(&buffer, obj->u.function.body);
if (buffer->buffer[buffer->position-1] != '\n')
bufcat(&buffer, "\n");
bufcat(&buffer, "$$");
}
else
{
/* Some generic other language, take our best guess */
bufcat(&buffer, "$$");
bufcat(&buffer, obj->u.function.body);
bufcat(&buffer, "$$");
}
/* Handle options */
if (obj->u.function.flags & mapred_function_strict)
bufcat(&buffer, " STRICT");
if (obj->u.function.flags & mapred_function_immutable)
bufcat(&buffer, " IMMUTABLE");
/* All done */
bufcat(&buffer, ";\n\n");
break;
case MAPRED_REDUCER:
{
mapred_object_t *transition = obj->u.reducer.transition.object;
mapred_object_t *combiner = obj->u.reducer.combiner.object;
mapred_object_t *finalizer = obj->u.reducer.finalizer.object;
char *state;
XASSERT(obj->name);
XASSERT(transition);
XASSERT(transition->name);
/*
* If the reducer depends on an object that hasn't been created
* then return false, it will be resolved during a second pass
*/
if ((transition && !transition->created) ||
(combiner && !combiner->created) ||
(finalizer && !finalizer->created))
{
if (global_print_flag && global_debug_flag)
printf("-- deferring REDUCE %s\n", obj->name);
break;
}
if (global_print_flag || global_debug_flag)
printf("-- REDUCE %s\n", obj->name);
/* Now, set things up to create the thing */
mapred_setup_columns(conn, obj);
plist = transition->u.function.parameters;
XASSERT(plist); /* state */
XASSERT(plist->next); /* parameters */
if (obj->u.reducer.ordering)
bufcat(&buffer, "CREATE ORDERED AGGREGATE ");
else
bufcat(&buffer, "CREATE AGGREGATE ");
bufcat(&buffer, doc->prefix);
bufcat(&buffer, obj->name);
bufcat(&buffer, " (");
/*
* Get the state type, and write out the aggregate parameters
* based on the parameter list of the transition function.
*/
plist = transition->u.function.parameters;
state = plist->type;
for (plist = plist->next; plist; plist = plist->next)
{
bufcat(&buffer, plist->type);
if (plist->next)
bufcat(&buffer, ", ");
}
bufcat(&buffer, ") (\n");
bufcat(&buffer, " stype = ");
bufcat(&buffer, state);
if (obj->u.reducer.initialize)
{
bufcat(&buffer, ",\n initcond = '");
bufcat(&buffer, obj->u.reducer.initialize);
bufcat(&buffer, "'");
}
bufcat(&buffer, ",\n sfunc = ");
if (!transition->internal)
bufcat(&buffer, doc->prefix);
bufcat(&buffer, transition->name);
if (combiner)
{
bufcat(&buffer, ",\n combinefunc = ");
if (!combiner->internal)
bufcat(&buffer, doc->prefix);
bufcat(&buffer, combiner->name);
}
/*
* To handle set returning finalizers the finalizer is pushed
* into the task definition rather than being placed in the
* uda where it belongs.
*/
/*
if (obj->u.reducer.finalizer.name)
{
bufcat(&buffer, ",\n finalfunc = ");
bufcat(&buffer, obj->u.reducer.finalizer.name);
}
*/
bufcat(&buffer, "\n);\n\n");
break;
}
case MAPRED_TASK:
case MAPRED_EXECUTION:
{
mapred_object_t *input = obj->u.task.input.object;
mapred_object_t *mapper = obj->u.task.mapper.object;
mapred_object_t *reducer = obj->u.task.reducer.object;
mapred_plist_t *columns = NULL;
mapred_plist_t *ingrouping = NULL;
mapred_plist_t *newitem = NULL;
mapred_plist_t *grouping = NULL;
mapred_plist_t *last = NULL;
mapred_plist_t *scan = NULL;
buffer_t *swap;
if (!obj->u.task.execute)
XASSERT(obj->name);
XASSERT(obj->u.task.input.name);
if (!qbuffer)
qbuffer = makebuffer(1024, 1024);
else
bufreset(qbuffer);
/*
* If the task depends on an object that hasn't been created then
* return false, it will be resolved during a second pass
*/
if ((input && !input->created) ||
(mapper && !mapper->created) ||
(reducer && !reducer->created))
{
if (global_print_flag && global_debug_flag)
{
if (obj->u.task.execute)
printf("-- deferring EXECUTION\n");
else
printf("-- deferring TASK %s\n", obj->name);
}
break;
}
if (global_print_flag || global_debug_flag)
{
if (obj->u.task.execute)
printf("-- EXECUTION\n");
else
printf("-- TASK %s\n", obj->name);
}
/*
* 1) Handle the INPUT, two cases:
* 1a) There is no MAP/REDUCE: "SELECT * FROM <input>"
* 1b) There is a MAP and/or REDUCE: "<input>"
*/
mapred_setup_columns(conn, obj);
if (!obj->u.task.mapper.name && !obj->u.task.reducer.name)
{
/* Allocate the buffer for the input. */
if (input->u.input.type == MAPRED_INPUT_TABLE)
{
bufcat(&qbuffer, "SELECT * FROM ");
bufcat(&qbuffer, input->u.input.desc);
}
else
{
bufcat(&qbuffer, "SELECT * FROM ");
bufcat(&qbuffer, input->name);
}
}
else
{
/* Input is just the name or description of the input */
if (input->u.input.type == MAPRED_INPUT_TABLE)
bufcat(&qbuffer, input->u.input.desc);
else
bufcat(&qbuffer, input->name);
}
/*
* How we get the columns depends a bit on the input.
* Is the input actually an "MAPRED_INPUT" object, or is it
* a "MAPRED_TASK" object?
*/
switch (input->kind)
{
case MAPRED_INPUT:
columns = input->u.input.columns;
break;
case MAPRED_TASK:
columns = input->u.task.returns;
ingrouping = input->u.task.grouping;
if (!columns)
{
mapred_obj_error(obj, "Unable to determine return "
"columns for TASK '%s'",
obj->u.task.input.name);
XRAISE(MAPRED_PARSE_INTERNAL, NULL);
}
break;
default:
mapred_obj_error(obj, "SOURCE '%s' is not an INPUT or "
"TASK object",
obj->u.task.input.name);
XRAISE(MAPRED_PARSE_ERROR, "Object creation Error");
break;
}
XASSERT(columns);
/*
* 2) Handle the MAPPER, two cases
* 2a) The Mapper returns an generated ADT that needs extraction
* "SELECT key(m), ...
* FROM (SELECT <map(...) as m FROM <input>) mapsubq
* 2b) The Mapper returns a single column:
* "SELECT <map>(...) FROM <input>"
*/
XASSERT(mapper || !obj->u.task.mapper.name);
if (mapper)
{
plist = mapper->u.function.returns;
plist2 = mapper->u.function.internal_returns;
XASSERT(plist);
if (plist->next)
{ /* 2a */
bufcat(&buffer, "SELECT ");
for (; plist; plist = plist->next )
{
if( obj->internal )
{
XASSERT(plist2 != NULL);
bufcat(&buffer, plist2->name);
plist2 = plist2->next;
}
else
{
bufcat(&buffer, plist->name);
}
bufcat(&buffer, "(m)");
bufcat(&buffer, " as ");
bufcat(&buffer, plist->name);
if (plist->next)
bufcat(&buffer, ", ");
}
bufcat(&buffer, "\nFROM (");
}
/* shared code */
bufcat(&buffer, "SELECT ");
if (!mapper->internal)
bufcat(&buffer, doc->prefix);
bufcat(&buffer, mapper->name);
bufcat(&buffer, "(");
plist = mapper->u.function.parameters;
for (; plist; plist = plist->next)
{
/* Check if this parameter is one of the input columns */
for (scan = columns; scan; scan = scan->next)
if (!strcasecmp(plist->name, scan->name))
{
bufcat(&buffer, plist->name);
break;
}
/* Task inputs also need to scan the grouping columns */
if (!scan)
for (scan = ingrouping; scan; scan = scan->next)
if (!strcasecmp(plist->name, scan->name))
{
bufcat(&buffer, plist->name);
break;
}
/* Check if this parameter is in global_plist */
if (!scan)
for (scan = global_plist; scan; scan = scan->next)
if (!strcasecmp(plist->name, scan->name))
{
/*
* (HACK)
* Note that global_plist overloads the
* plist structure using the "type" field
* to store "value".
* (HACK)
*/
bufcat(&buffer, "'");
bufcat(&buffer, scan->type);
bufcat(&buffer, "'::");
bufcat(&buffer, plist->type);
break;
}
/*
* If we couldn't find it issue a warning and
* set to NULL
*/
if (!scan)
{
if (global_verbose_flag)
fprintf(stderr, " ");
fprintf(stderr,
"WARNING: unset parameter - "
"%s(%s => NULL)\n",
mapper->name, plist->name);
bufcat(&buffer, "NULL::");
bufcat(&buffer, plist->type);
}
/* Add a comma if there is another parameter */
if (plist->next)
bufcat(&buffer, ", ");
}
bufcat(&buffer, ") as ");
/* break into cases again */
plist = mapper->u.function.returns;
if (plist->next)
{ /* 2a */
bufcat(&buffer, "m FROM ");
bufcat(&buffer, qbuffer->buffer);
bufcat(&buffer, ") mapxq\n");
/*
* Need to work this through, it seems that m is true
* whenever a column is null, which is not the desired
* behavior.
*
* Look more closely at "grep" code for why we want it,
* and "oreilly" code for why we don't.
*
* For the moment the compromise is that we do it only
* for SINGLE mode functions, since MULTI mode can
* control it's own filtering without this.
*/
if (mapper->u.function.mode != MAPRED_MODE_MULTI)
bufcat(&buffer, "WHERE m is not null");
}
else
{
bufcat(&buffer, plist->name);
bufcat(&buffer, " FROM ");
bufcat(&buffer, qbuffer->buffer);
}
/*
* Swap the buffer into the qbuffer for input as the next
* stage of the query pipeline.
*/
swap = qbuffer;
qbuffer = buffer;
buffer = swap;
bufreset(buffer);
/* Columns are now the output of the mapper */
columns = mapper->u.function.returns;
ingrouping = NULL;
}
/*
* 3) Handle the Reducer, several sub-cases:
*/
if (obj->u.task.reducer.name)
{
/*
* Step 1: Determine grouping columns
* Find which columns are returned from the previous
* stage that are NOT parameters to the reducer.
*/
grouping = last = NULL;
if (!reducer)
{
/*
* We have a reducer, but it isn't listed in the YAML.
* How to work out parameter handling still needs to
* be worked out. For now we just assume that this
* sort of function always takes a single "value" column
* and returns a "value" column.
*/
for (plist = columns; plist; plist = plist->next)
{
if (strcasecmp(plist->name, "value"))
{
if (grouping)
{
last->next =
mapred_malloc(sizeof(mapred_plist_t));
last = last->next;
}
else
{
grouping =
mapred_malloc(sizeof(mapred_plist_t));
last = grouping;
}
last->name = plist->name;
last->type = plist->type;
last->next = NULL;
}
}
}
else
{ /* The reducer exists in the YAML */
/* We precalculated the grouping columns */
grouping = obj->u.task.grouping;
}
/* Fill in the buffer */
bufcat(&buffer, "SELECT ");
for (plist = grouping; plist; plist = plist->next)
{
bufcat(&buffer, plist->name);
bufcat(&buffer, ", ");
}
/* Call the aggregation function */
if (reducer && !reducer->internal)
bufcat(&buffer, doc->prefix);
bufcat(&buffer, obj->u.task.reducer.name);
bufcat(&buffer, "(");
if (reducer)
{
plist = reducer->u.reducer.parameters;
for (; plist; plist = plist->next)
{
/* Check if parameter is one of the input columns */
for (scan = columns; scan; scan = scan->next)
if (!strcasecmp(plist->name, scan->name))
{
bufcat(&buffer, plist->name);
break;
}
/* Task inputs need to scan the grouping columns */
if (!scan)
for (scan = ingrouping; scan; scan = scan->next)
if (!strcasecmp(plist->name, scan->name))
{
bufcat(&buffer, plist->name);
break;
}
/* Check if this parameter is in global_plist */
if (!scan)
{
for (scan = global_plist;
scan;
scan = scan->next)
{
if (!strcasecmp(plist->name, scan->name))
{
/*
* (HACK)
* Note that global_plist overloads the
* plist structure using the "type"
* field to store "value".
* (HACK)
*/
bufcat(&buffer, "'");
bufcat(&buffer, scan->type);
bufcat(&buffer, "'::");
bufcat(&buffer, plist->type);
break;
}
}
}
/*
* If we couldn't find it issue a warning
* and set to NULL
*/
if (!scan)
{
if (global_verbose_flag)
fprintf(stderr, " ");
fprintf(stderr,
"WARNING: unset parameter - "
"%s(%s => NULL)\n",
reducer->name, plist->name);
bufcat(&buffer, "NULL::");
bufcat(&buffer, plist->type);
}
if (plist->next)
bufcat(&buffer, ", ");
}
/* Handle ORDERING, if specified */
clist = reducer->u.reducer.ordering;
if (clist)
bufcat(&buffer, " ORDER BY ");
for(; clist; clist = clist->next)
{
bufcat(&buffer, clist->value);
if (clist->next)
bufcat(&buffer, ", ");
}
}
else
{
/*
* non-yaml reducer always takes "value" as the
* input column
*/
/* Check if "value" is one of the input columns */
for (scan = columns; scan; scan = scan->next)
if (!strcasecmp(scan->name, "value"))
{
bufcat(&buffer, "value");
break;
}
/* Task inputs also need to scan the grouping columns */
if (!scan)
for (scan = ingrouping; scan; scan = scan->next)
if (!strcasecmp(plist->name, scan->name))
{
bufcat(&buffer, "value");
break;
}
/* Check if this parameter is in global_plist */
if (!scan)
for (scan = global_plist; scan; scan = scan->next)
if (!strcasecmp(scan->name, "value"))
{
/*
* (HACK)
* Note that global_plist overloads the
* plist structure using the "type" field
* to store "value".
* (HACK)
*/
bufcat(&buffer, "'");
bufcat(&buffer, scan->type);
bufcat(&buffer, "'::");
bufcat(&buffer, plist->type);
break;
}
if (!scan)
{
if (global_verbose_flag)
fprintf(stderr, " ");
fprintf(stderr,
"WARNING: unset parameter - "
"%s(value => NULL)\n",
obj->u.task.reducer.name);
bufcat(&buffer, "NULL");
}
}
bufcat(&buffer, ") as ");
if (reducer)
{
plist = reducer->u.reducer.returns;
XASSERT(plist); /* Need to have a return! */
/*
* If the reducer has a finalizer we push it outside of
* the context of the UDA so that we can properly handle
* set returning/column returning functions.
*/
if (reducer->u.reducer.finalizer.name)
bufcat(&buffer, "r");
else
bufcat(&buffer, plist->name);
}
else
{
/*
* non-yaml reducer always return a single column
* named "value"
*/
bufcat(&buffer, "value");
}
bufcat(&buffer, "\nFROM ");
if (mapper)
{
bufcat(&buffer, "(");
bufcat(&buffer, qbuffer->buffer);
bufcat(&buffer, ") mapsubq");
}
else
{
bufcat(&buffer, qbuffer->buffer);
}
if (grouping)
{
bufcat(&buffer, "\nGROUP BY ");
for (plist = grouping; plist; plist = plist->next)
{
bufcat(&buffer, plist->name);
if (plist->next)
bufcat(&buffer, ", ");
}
}
/*
* Swap the buffer into the qbuffer for input as the next
* stage of the query pipeline.
*/
swap = qbuffer;
qbuffer = buffer;
buffer = swap;
bufreset(buffer);
/*
* Add the return columns to the grouping columns and set
* it to the current columns.
*
* Note that unlike the columns set by the mapper or the
* input this is a list that must be de-allocated.
*/
columns = last;
/*
* If the reducer had a finalizer we push it into another
* nested subquery since user defined aggregates aren't
* allowed to return sets.
*
* NOTE: this code mostly duplicates the MAP code above
*/
if (reducer && reducer->u.reducer.finalizer.name)
{
mapred_object_t *finalizer;
finalizer = reducer->u.reducer.finalizer.object;
XASSERT(finalizer); /* FIXME */
XASSERT(finalizer->u.function.returns);
/*
* If the finalizer returns multiple columns then we
* need an extra layer of wrapping to extract them.
*/
plist = finalizer->u.function.returns;
if (plist->next)
{
bufcat(&buffer, "SELECT ");
/* the grouping columns */
for (plist = grouping;
plist;
plist = plist->next)
{
bufcat(&buffer, plist->name);
bufcat(&buffer, ", ");
}
plist2 = finalizer->u.function.internal_returns;
for (plist = finalizer->u.function.returns;
plist;
plist = plist->next)
{
if( finalizer->internal )
{
XASSERT( plist2 != NULL );
bufcat(&buffer, plist2->name);
plist2 = plist2->next;
}
else
{
bufcat(&buffer, plist->name);
}
bufcat(&buffer, "(r)");
bufcat(&buffer, " as ");
bufcat(&buffer, plist->name);
if (plist->next)
bufcat(&buffer, ", ");
}
bufcat(&buffer, "\nFROM (");
}
/*
* Call the function on the returned state from
* the reducer.
*/
bufcat(&buffer, "SELECT ");
/* grouping columns */
for (plist = grouping;
plist;
plist = plist->next)
{
bufcat(&buffer, plist->name);
bufcat(&buffer, ", ");
}
if (!finalizer->internal)
bufcat(&buffer, doc->prefix);
bufcat(&buffer, finalizer->name);
bufcat(&buffer, "(r) as ");
/* break into cases again */
plist = finalizer->u.function.returns;
if (plist->next)
bufcat(&buffer, "r");
else
bufcat(&buffer, plist->name);
bufcat(&buffer, " FROM (");
bufcat(&buffer, qbuffer->buffer);
bufcat(&buffer, ") redxq\n");
/*
* If we have that extra layer of wrapping
* then close it off
*/
if (finalizer->u.function.returns->next)
bufcat(&buffer, ") redsubq\n");
/*
* Swap the buffer into the qbuffer for input as the next
* stage of the query pipeline.
*/
swap = qbuffer;
qbuffer = buffer;
buffer = swap;
bufreset(buffer);
}
}
/*
* 4) Handle the final transform into the view definition:
* "CREATE TEMPORARY VIEW . AS .;"
*/
bufcat(&buffer, "CREATE TEMPORARY VIEW ");
bufcat(&buffer, obj->name);
bufcat(&buffer, " AS\n");
bufcat(&buffer, qbuffer->buffer);
bufcat(&buffer, ";\n\n");
/*
* If there was a reducer then we have to release the columns
* list, otherwise it is a pointer to an existing list and can
* be ignored.
*/
if (obj->u.task.reducer.name)
{
plist = columns;
while (plist && plist != grouping)
{
newitem = plist;
plist = plist->next;
mapred_free(newitem);
}
}
break;
}
case MAPRED_ADT:
XASSERT(obj->name);
mapred_setup_columns(conn, obj);
/*
* ADT's have generated names that already include the
* document prefix
*/
bufcat(&buffer, "CREATE TYPE ");
bufcat(&buffer, obj->name);
bufcat(&buffer, " as (");
for (plist = obj->u.adt.returns; plist; plist = plist->next)
{
bufcat(&buffer, plist->name);
bufcat(&buffer, " ");
bufcat(&buffer, plist->type);
if (plist->next)
bufcat(&buffer, ", ");
}
bufcat(&buffer, ");\n\n");
break;
default:
XASSERT(false);
}
if (buffer->position > 0)
{
/*
* In print-only mode we do everything but run the queries
* ie, we still create and destroy objects.
*/
if (global_print_flag || global_debug_flag)
printf("%s", buffer->buffer);
/*
* Try to create the object, but failure should not terminate
* the transaction, so wrap it in a savepoint.
*/
PQexec(conn, "SAVEPOINT mapreduce_save");
result = PQexec(conn, buffer->buffer);
if (PQresultStatus(result) == PGRES_COMMAND_OK)
{
obj->created = true;
PQexec(conn, "RELEASE SAVEPOINT mapreduce_save");
}
else
{
char *error = PQresultErrorField(result, PG_DIAG_SQLSTATE);
/* rollback to savepoint */
PQexec(conn, "ROLLBACK TO SAVEPOINT mapreduce_save");
PQexec(conn, "RELEASE SAVEPOINT mapreduce_save");
/*
* If we have an "object does not exist" error from a SQL input
* then it may just be a dependency issue, so we don't error
* right away.
*/
if (obj->kind != MAPRED_INPUT ||
obj->u.input.type != MAPRED_INPUT_QUERY ||
strcmp(error, OBJ_DOES_NOT_EXIST))
{
if (global_verbose_flag)
fprintf(stderr, " - ");
fprintf(stderr, "%s", PQresultErrorMessage(result));
XRAISE(MAPRED_SQL_ERROR, "Object creation Failure");
}
if (global_verbose_flag)
fprintf(stderr, " Error: %s\n",
PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY));
/*
* If it is an error that we think we can recover from then we don't
* log the error immediately, but write it to a buffer in the event
* that recovery wasn't successful.
*/
if (doc->errors)
{
if (global_verbose_flag)
bufcat(&doc->errors, " - ");
bufcat(&doc->errors, "Error: ");
bufcat(&doc->errors, (char*) mapred_kind_name[obj->kind]);
if (obj->name)
{
bufcat(&doc->errors, " '");
bufcat(&doc->errors, obj->name);
bufcat(&doc->errors, "'");
}
bufcat(&doc->errors, ": ");
bufcat(&doc->errors, PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY));
if (obj->line > 0)
{
char numbuf[64];
sprintf(numbuf, ", at line %d", obj->line);
bufcat(&doc->errors, numbuf);
}
bufcat(&doc->errors, "\n");
}
}
}
/*
* INPUTS setup columns AFTER creation.
* All other objects handle it above prior to creation.
*/
if (obj->kind == MAPRED_INPUT && obj->created)
mapred_setup_columns(conn, obj);
}
XFINALLY
{
if (buffer)
mapred_free(buffer);
if (qbuffer)
mapred_free(qbuffer);
}
XTRY_END;
return obj->created;
}