void mapred_run_queries()

in gpcontrib/gpmapreduce/src/mapred.c [2197:2502]


void mapred_run_queries(PGconn *conn, mapred_document_t *doc)
{
	mapred_olist_t  *olist;
	mapred_plist_t  *columns;
	mapred_object_t *output;
	PGresult        *result  = NULL;
	FILE            *outfile = stdout;
	buffer_t        *buffer  = NULL;

	XTRY
	{
		/* allocates 512 bytes, extending by 512 bytes if we run out. */
		buffer = makebuffer(512, 512);

		/* Loop through all objects */
		for (olist = doc->objects; olist; olist = olist->next)
		{
			if (olist->object->kind == MAPRED_EXECUTION)
			{
				boolean exists = false;

				XASSERT(olist->object->name);

				/* Reset the buffer from any previous executions */
				bufreset(buffer);

				output = olist->object->u.task.output.object;

				/*
				 *  [CREATE TABLE <name> AS ]
				 *    SELECT * FROM <name>
				 *    ORDER BY <column-list>
				 */
				if (output && output->u.output.type == MAPRED_OUTPUT_TABLE)
				{
					/* does the table already exist? */
					bufcat(&buffer,
						   "SELECT n.nspname \n"
						   "FROM   pg_catalog.pg_class c JOIN \n"
						   "       pg_catalog.pg_namespace n on \n"
						   "       (c.relnamespace = n.oid) \n"
						   "WHERE  n.nspname = ANY(current_schemas(true)) \n"
						   "  AND  c.relname = lower('");
					bufcat(&buffer, output->u.output.desc);
					bufcat(&buffer, "')");
					result = PQexec(conn, buffer->buffer);
					if (PQresultStatus(result) == PGRES_TUPLES_OK &&
						PQntuples(result) > 0)
						exists = true;
					bufreset(buffer);

					if (exists && output->u.output.mode == MAPRED_OUTPUT_MODE_REPLACE)
					{
						bufcat(&buffer, "DROP TABLE ");
						bufcat(&buffer, output->u.output.desc);
						PQexec(conn, "SAVEPOINT mapreduce_save");
						result = PQexec(conn, buffer->buffer);
						if (PQresultStatus(result) == PGRES_COMMAND_OK)
						{
							PQexec(conn, "RELEASE SAVEPOINT mapreduce_save");
						}
						else
						{
							/* rollback to savepoint */
							PQexec(conn, "ROLLBACK TO SAVEPOINT mapreduce_save");
							PQexec(conn, "RELEASE SAVEPOINT mapreduce_save");

							if (global_verbose_flag)
								fprintf(stderr, "   - ");
							fprintf(stderr, "Error: %s\n",
									PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY));
							mapred_obj_error(output, "Table '%s' can't be replaced",
											 output->u.output.desc);
							XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
						}
						bufreset(buffer);
						exists = false;
					}

					/* Handle Explain for OUTPUT TABLE */
					if (global_explain_flag & global_analyze)
						bufcat(&buffer, "EXPLAIN ANALYZE ");
					else if (global_explain_flag)
						bufcat(&buffer, "EXPLAIN ");

					if (!exists)
					{
						bufcat(&buffer, "CREATE TABLE ");
						bufcat(&buffer, output->u.output.desc);
						bufcat(&buffer, " AS ");
					}
					else if (output->u.output.mode == MAPRED_OUTPUT_MODE_APPEND)
					{
						bufcat(&buffer, "INSERT INTO ");
						bufcat(&buffer, output->u.output.desc);
						bufcat(&buffer, " (");
					}
					else
					{
						/* exists, mode is neither replace or append => error */
						mapred_obj_error(output, "Table '%s' already exists",
										 output->u.output.desc);
						XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
					}
				}
				/* Handle Explain for non-table output */
				else if (global_explain_flag & global_analyze)
				{
					bufcat(&buffer, "EXPLAIN ANALYZE ");
				}
				else if (global_explain_flag)
				{
					bufcat(&buffer, "EXPLAIN ");
				}

				bufcat(&buffer, "SELECT * FROM ");
				bufcat(&buffer, olist->object->name);

				/*
				 * add the DISTRIBUTED BY clause for output tables
				 * OR, the ORDER BY clause for other output formats
				 */
				if (output && output->u.output.type == MAPRED_OUTPUT_TABLE)
				{
					/*
					 * If there are no key columns then leave off the
					 * distributed by clause and let the server choose.
					 */
					if (exists)
						bufcat(&buffer, ")");

					else if (olist->object->u.task.grouping)
					{
						bufcat(&buffer, " DISTRIBUTED BY (");
						columns = olist->object->u.task.grouping;
						while (columns)
						{
							bufcat(&buffer, columns->name);
							if (columns->next)
								bufcat(&buffer, ", ");
							columns = columns->next;
						}
						bufcat(&buffer, ")");
					}
					else
					{
						/*
						 * don't have any hints for what the distribution keys
						 * should be, so we do nothing and let the database
						 * decide
						 */
					}
				}
				else
				{
					if (olist->object->u.task.returns ||
						olist->object->u.task.grouping)
					{
						bufcat(&buffer, " ORDER BY ");
						columns = olist->object->u.task.grouping;
						while (columns)
						{
							bufcat(&buffer, columns->name);
							if (columns->next || olist->object->u.task.returns)
								bufcat(&buffer, ", ");
							columns = columns->next;
						}
						columns = olist->object->u.task.returns;
						while (columns)
						{
							bufcat(&buffer, columns->name);
							if (columns->next)
								bufcat(&buffer, ", ");
							columns = columns->next;
						}
					}
				}
				bufcat(&buffer, ";\n");

				/* Tell the user what job we are running */
				if (global_verbose_flag)
					fprintf(stderr, "  - RUN: ");
				if (global_print_flag || global_debug_flag)
					fprintf(stderr, "%s", buffer->buffer);
				else
					fprintf(stderr, "%s\n", olist->object->name);

				/* But we only execute it if we are not in "print-only" mode */
				if (!global_print_flag)
				{
					/* If we have an output file, open it for write now */
					if (output && output->u.output.type == MAPRED_OUTPUT_FILE)
					{
						switch (output->u.output.mode)
						{
							case MAPRED_OUTPUT_MODE_NONE:
								/* check if the file exists */
								if (access(output->u.output.desc, F_OK) == 0)
								{
									mapred_obj_error(output, "file '%s' already exists",
													 output->u.output.desc);
									XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
								}
								/* Fallthrough */

							case MAPRED_OUTPUT_MODE_REPLACE:
								outfile = fopen(output->u.output.desc, "wb");
								break;

							case MAPRED_OUTPUT_MODE_APPEND:
								outfile = fopen(output->u.output.desc, "ab");
								break;

							default:
								XASSERT(false);
						}

						if (!outfile)
						{
							mapred_obj_error(output, "could not open file '%s' for write",
											 output->u.output.desc);
							XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure");
						}
					}
					else
					{
						outfile = stdout;
					}

					/*
					 * Enable notices for user queries since they may contain
					 * debugging info.
					 */
					PQsetNoticeReceiver(conn, print_notice_handler, NULL);
					result = PQexec(conn, buffer->buffer);
					PQsetNoticeReceiver(conn, ignore_notice_handler, NULL);
					switch (PQresultStatus(result))
					{
						/* Output is STDOUT or FILE */
						case  PGRES_TUPLES_OK:
						{
							PQprintOpt options;
							memset(&options, 0, sizeof(options));

							/*
							 * Formatting:
							 *   STDOUT = fancy formatting
							 *   FILE   = plain formatting
							 */
							if (outfile == stdout)
							{
								options.header = true;
								options.align  = true;
								options.fieldSep  = "|";
							}
							else if (output->u.output.delimiter)
							{
								options.fieldSep = output->u.output.delimiter;
							}
							else
							{
								/* "\t" is our default delimiter */
								options.fieldSep  = "\t";
							}

							PQprint(outfile, result, &options);
							break;
						}

						/* OUTPUT is a table */
						case PGRES_COMMAND_OK:
							fprintf(stderr, "DONE\n");
							break;

						/* An error of some kind */
						default:
							XRAISE(MAPRED_SQL_ERROR, "Execution Failure");
					}
					PQclear(result);
					result = NULL;

					if (NULL != outfile && outfile != stdout)
					{
						fclose(outfile);
						outfile = stdout;
					}
				}
			}
		}
	}
	XFINALLY
	{
		if (result)
			PQclear(result);

		if (NULL != outfile && outfile != stdout)
		{
			fclose(outfile);
			outfile = stdout;
		}

		if (buffer)
			mapred_free(buffer);
	}
	XTRY_END;
}