in gpcontrib/gpmapreduce/src/mapred.c [2197:2502]
void mapred_run_queries(PGconn *conn, mapred_document_t *doc)
{
mapred_olist_t *olist;
mapred_plist_t *columns;
mapred_object_t *output;
PGresult *result = NULL;
FILE *outfile = stdout;
buffer_t *buffer = NULL;
XTRY
{
/* allocates 512 bytes, extending by 512 bytes if we run out. */
buffer = makebuffer(512, 512);
/* Loop through all objects */
for (olist = doc->objects; olist; olist = olist->next)
{
if (olist->object->kind == MAPRED_EXECUTION)
{
boolean exists = false;
XASSERT(olist->object->name);
/* Reset the buffer from any previous executions */
bufreset(buffer);
output = olist->object->u.task.output.object;
/*
* [CREATE TABLE <name> AS ]
* SELECT * FROM <name>
* ORDER BY <column-list>
*/
if (output && output->u.output.type == MAPRED_OUTPUT_TABLE)
{
/* does the table already exist? */
bufcat(&buffer,
"SELECT n.nspname \n"
"FROM pg_catalog.pg_class c JOIN \n"
" pg_catalog.pg_namespace n on \n"
" (c.relnamespace = n.oid) \n"
"WHERE n.nspname = ANY(current_schemas(true)) \n"
" AND c.relname = lower('");
bufcat(&buffer, output->u.output.desc);
bufcat(&buffer, "')");
result = PQexec(conn, buffer->buffer);
if (PQresultStatus(result) == PGRES_TUPLES_OK &&
PQntuples(result) > 0)
exists = true;
bufreset(buffer);
if (exists && output->u.output.mode == MAPRED_OUTPUT_MODE_REPLACE)
{
bufcat(&buffer, "DROP TABLE ");
bufcat(&buffer, output->u.output.desc);
PQexec(conn, "SAVEPOINT mapreduce_save");
result = PQexec(conn, buffer->buffer);
if (PQresultStatus(result) == PGRES_COMMAND_OK)
{
PQexec(conn, "RELEASE SAVEPOINT mapreduce_save");
}
else
{
/* rollback to savepoint */
PQexec(conn, "ROLLBACK TO SAVEPOINT mapreduce_save");
PQexec(conn, "RELEASE SAVEPOINT mapreduce_save");
if (global_verbose_flag)
fprintf(stderr, " - ");
fprintf(stderr, "Error: %s\n",
PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY));
mapred_obj_error(output, "Table '%s' can't be replaced",
output->u.output.desc);
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
bufreset(buffer);
exists = false;
}
/* Handle Explain for OUTPUT TABLE */
if (global_explain_flag & global_analyze)
bufcat(&buffer, "EXPLAIN ANALYZE ");
else if (global_explain_flag)
bufcat(&buffer, "EXPLAIN ");
if (!exists)
{
bufcat(&buffer, "CREATE TABLE ");
bufcat(&buffer, output->u.output.desc);
bufcat(&buffer, " AS ");
}
else if (output->u.output.mode == MAPRED_OUTPUT_MODE_APPEND)
{
bufcat(&buffer, "INSERT INTO ");
bufcat(&buffer, output->u.output.desc);
bufcat(&buffer, " (");
}
else
{
/* exists, mode is neither replace or append => error */
mapred_obj_error(output, "Table '%s' already exists",
output->u.output.desc);
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
}
/* Handle Explain for non-table output */
else if (global_explain_flag & global_analyze)
{
bufcat(&buffer, "EXPLAIN ANALYZE ");
}
else if (global_explain_flag)
{
bufcat(&buffer, "EXPLAIN ");
}
bufcat(&buffer, "SELECT * FROM ");
bufcat(&buffer, olist->object->name);
/*
* add the DISTRIBUTED BY clause for output tables
* OR, the ORDER BY clause for other output formats
*/
if (output && output->u.output.type == MAPRED_OUTPUT_TABLE)
{
/*
* If there are no key columns then leave off the
* distributed by clause and let the server choose.
*/
if (exists)
bufcat(&buffer, ")");
else if (olist->object->u.task.grouping)
{
bufcat(&buffer, " DISTRIBUTED BY (");
columns = olist->object->u.task.grouping;
while (columns)
{
bufcat(&buffer, columns->name);
if (columns->next)
bufcat(&buffer, ", ");
columns = columns->next;
}
bufcat(&buffer, ")");
}
else
{
/*
* don't have any hints for what the distribution keys
* should be, so we do nothing and let the database
* decide
*/
}
}
else
{
if (olist->object->u.task.returns ||
olist->object->u.task.grouping)
{
bufcat(&buffer, " ORDER BY ");
columns = olist->object->u.task.grouping;
while (columns)
{
bufcat(&buffer, columns->name);
if (columns->next || olist->object->u.task.returns)
bufcat(&buffer, ", ");
columns = columns->next;
}
columns = olist->object->u.task.returns;
while (columns)
{
bufcat(&buffer, columns->name);
if (columns->next)
bufcat(&buffer, ", ");
columns = columns->next;
}
}
}
bufcat(&buffer, ";\n");
/* Tell the user what job we are running */
if (global_verbose_flag)
fprintf(stderr, " - RUN: ");
if (global_print_flag || global_debug_flag)
fprintf(stderr, "%s", buffer->buffer);
else
fprintf(stderr, "%s\n", olist->object->name);
/* But we only execute it if we are not in "print-only" mode */
if (!global_print_flag)
{
/* If we have an output file, open it for write now */
if (output && output->u.output.type == MAPRED_OUTPUT_FILE)
{
switch (output->u.output.mode)
{
case MAPRED_OUTPUT_MODE_NONE:
/* check if the file exists */
if (access(output->u.output.desc, F_OK) == 0)
{
mapred_obj_error(output, "file '%s' already exists",
output->u.output.desc);
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
/* Fallthrough */
case MAPRED_OUTPUT_MODE_REPLACE:
outfile = fopen(output->u.output.desc, "wb");
break;
case MAPRED_OUTPUT_MODE_APPEND:
outfile = fopen(output->u.output.desc, "ab");
break;
default:
XASSERT(false);
}
if (!outfile)
{
mapred_obj_error(output, "could not open file '%s' for write",
output->u.output.desc);
XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
}
}
else
{
outfile = stdout;
}
/*
* Enable notices for user queries since they may contain
* debugging info.
*/
PQsetNoticeReceiver(conn, print_notice_handler, NULL);
result = PQexec(conn, buffer->buffer);
PQsetNoticeReceiver(conn, ignore_notice_handler, NULL);
switch (PQresultStatus(result))
{
/* Output is STDOUT or FILE */
case PGRES_TUPLES_OK:
{
PQprintOpt options;
memset(&options, 0, sizeof(options));
/*
* Formatting:
* STDOUT = fancy formatting
* FILE = plain formatting
*/
if (outfile == stdout)
{
options.header = true;
options.align = true;
options.fieldSep = "|";
}
else if (output->u.output.delimiter)
{
options.fieldSep = output->u.output.delimiter;
}
else
{
/* "\t" is our default delimiter */
options.fieldSep = "\t";
}
PQprint(outfile, result, &options);
break;
}
/* OUTPUT is a table */
case PGRES_COMMAND_OK:
fprintf(stderr, "DONE\n");
break;
/* An error of some kind */
default:
XRAISE(MAPRED_SQL_ERROR, "Execution Failure");
}
PQclear(result);
result = NULL;
if (NULL != outfile && outfile != stdout)
{
fclose(outfile);
outfile = stdout;
}
}
}
}
}
XFINALLY
{
if (result)
PQclear(result);
if (NULL != outfile && outfile != stdout)
{
fclose(outfile);
outfile = stdout;
}
if (buffer)
mapred_free(buffer);
}
XTRY_END;
}