in gpcontrib/gpmapreduce/src/main.c [82:436]
int main (int argc, char *argv[])
{
volatile int errcode = 0;
int c;
char *procname = argv[0];
int forceprompt = false;
int needpass = false;
char *filename = NULL;
char *username = NULL;
char *database = NULL;
char *hostname = NULL;
char *port = NULL;
mapred_olist_t *documents;
mapred_olist_t *doc_item;
FILE *file;
/* The long_options structure */
static struct option long_options[] = {
{"help", no_argument, 0, '?'},
{"version", no_argument, 0, 'V'},
{"verbose", no_argument, 0, 'v'},
{"password", no_argument, 0, 'W'},
{"explain", no_argument, 0, 'x'},
{"explain-analyze", no_argument, 0, 'X'},
{"username", required_argument, 0, 'U'},
{"host", required_argument, 0, 'h'},
{"port", required_argument, 0, 'p'},
{"file", required_argument, 0, 'f'},
{"key", required_argument, 0, 'k'},
{"print", no_argument, 0, 'P'},
{"debug", no_argument, 0, 'D'},
{0, 0, 0, 0}
};
static char* short_options = "VvWxXU:h:p:f:k:?PD";
while (1)
{
int option_index = 0;
c = getopt_long(argc, argv, short_options, long_options,
&option_index);
if (c == -1)
break; /* done processing options */
switch (c)
{
case '?': /* --help */
/* Actual help option given */
if (strcmp(argv[optind - 1], "-?") == 0 ||
strcmp(argv[optind - 1], "--help") == 0)
{
usage(procname, true);
exit(0);
}
/* unknown option reported by getopt */
fprintf(stderr, "Try \"%s --help\" for usage information.\n",
procname);
exit(1);
case 'V': /* --version */
showVersion(procname);
exit(0);
case 'v': /* --verbose */
global_verbose_flag = true;
break;
case 'x': /* --explain */
global_explain_flag |= global_explain;
break;
case 'X': /* --explain-analyze */
global_explain_flag |= global_explain | global_analyze;
break;
case 'P': /* --print */
global_print_flag = 1;
break;
case 'D': /* --debug */
global_debug_flag = 1;
break;
case 'W': /* --password */
forceprompt = true;
break;
case 'U': /* --username */
username = optarg;
break;
case 'h': /* --host */
hostname = optarg;
break;
case 'p': /* --port */
port = optarg;
break;
case 'f': /* --file */
filename = optarg;
break;
case 'k': /* --key */
{
mapred_plist_t *newitem;
char *name = optarg;
char *value = NULL;
char *eq = strchr(name, '=');
/*
* either --key value : sets parameter named "key"
* or --key name=value : sets parameter named "name"
*/
if (eq)
{
eq[0] = '\0';
value = eq+1;
/* make sure parameter is a valid name */
if (strspn(name, wordchars) != strlen(name))
{
fprintf(stderr, "bad parameter --key %s\n", name);
exit(1);
}
}
else
{
value = name;
name = "key";
}
/* Add the parameter to the global parameter list */
newitem = malloc(sizeof(mapred_plist_t));
newitem->name = name;
newitem->type = value;
newitem->next = global_plist;
global_plist = newitem;
}
break;
default: /* not feasible */
fprintf(stderr, "Error processing options\n");
exit(1);
}
}
/* open the file */
if (!filename)
{
usage(procname, false);
exit(1);
}
file = fopen(filename, "rb");
if (!file)
{
fprintf(stderr, "Error: Could not open file '%s'\n", filename);
exit(1);
}
/*
* Handle additional arguments as would psql:
* - First argument is database
* - Second argument is username, if not specified via -U
* - All other arguments generate warnings
*/
if (optind < argc && !database)
database = argv[optind++];
if (optind < argc && !username)
username = argv[optind++];
while (optind < argc)
{
fprintf(stderr, "%s: warning: extra command-line argument \"%s\" ignored\n",
procname, argv[optind++]);
}
if (global_verbose_flag)
{
mapred_plist_t *param = global_plist;
while (param)
{
fprintf(stderr, "- Parameter: %s=%s\n",
param->name, param->type);
param = param->next;
}
fprintf(stderr, "- Parsing '%s':\n", filename);
}
documents = NULL;
XTRY
{
documents = mapred_parse_file(file);
}
XCATCH(ASSERTION_FAILURE)
{
fprintf(stderr, "Assertion failure at %s:%d\n",
xframe.file, xframe.lineno);
exit(1);
}
XCATCH_ANY
{
if (global_verbose_flag)
fprintf(stderr, " - ");
if (xframe.exception)
fprintf(stderr, "Error: %s\n", (char *) xframe.exception);
else
fprintf(stderr, "Unknown Error (%d) at %s:%d\n",
xframe.errcode, xframe.file, xframe.lineno);
exit(1);
}
XTRY_END;
/* Do something interesting with documents */
for (doc_item = documents; doc_item; doc_item = doc_item->next)
{
PGconn *conn = NULL;
char pwdbuf[100];
char portbuf[11]; /* max int size should be 10 digits */
char *user, *db, *host, *pwd, *pqport, *options, *tty;
XTRY
{
mapred_document_t *doc = &doc_item->object->u.document;
if (global_verbose_flag)
{
fprintf(stderr, "- Executing Document %d:\n", doc->id);
}
if (port)
{
pqport = port;
}
else if (doc->port > 0)
{
snprintf(portbuf, sizeof(portbuf), "%d", doc->port);
pqport = portbuf;
}
else
{
pqport = NULL;
}
if (database)
db = database;
else
db = doc->database;
if (username)
user = username;
else
user = doc->user;
if (hostname)
host = hostname;
else
host = doc->host;
options = NULL;
tty = NULL;
pwd = NULL;
if (forceprompt)
{
read_password(pwdbuf, sizeof(pwdbuf));
pwd = pwdbuf;
}
do {
conn = PQsetdbLogin(host, pqport, options, tty, db, user, pwd);
needpass = false;
if (PQstatus(conn) == CONNECTION_BAD &&
!strcmp(PQerrorMessage(conn), PQnoPasswordSupplied))
{
PQfinish(conn);
read_password(pwdbuf, sizeof(pwdbuf));
pwd = pwdbuf;
needpass = true;
}
} while (needpass);
if (PQstatus(conn) == CONNECTION_BAD)
{
XRAISE(CONNECTION_ERROR, PQerrorMessage(conn));
}
else
{
if (global_verbose_flag)
{
fprintf(stderr, " - Connected Established:\n");
fprintf(stderr, " HOST: %s\n",
PQhost(conn) ? PQhost(conn) : "localhost");
fprintf(stderr, " PORT: %s\n", PQport(conn));
fprintf(stderr, " USER: %s/%s\n", PQuser(conn), PQdb(conn));
}
check_version(conn);
/* Prepare to receive interrupts */
cancelConn = PQgetCancel(conn);
if (signal(SIGINT, sigint_handler) == SIG_IGN)
signal(SIGINT, SIG_IGN);
if (signal(SIGHUP, sigint_handler) == SIG_IGN)
signal(SIGHUP, SIG_IGN);
if (signal(SIGTERM, sigint_handler) == SIG_IGN)
signal(SIGTERM, SIG_IGN);
mapred_run_document(conn, doc);
}
}
XCATCH(ASSERTION_FAILURE)
{
fprintf(stderr, "Assertion failure at %s:%d\n",
xframe.file, xframe.lineno);
errcode = 1;
}
XCATCH(USER_INTERUPT)
{
if (global_verbose_flag)
fprintf(stderr, " - ");
fprintf(stderr, "Job Cancelled: User Interrupt");
exit(2); /* exit immediately */
}
XCATCH_ANY
{
if (global_verbose_flag)
fprintf(stderr, " - ");
if (xframe.exception)
fprintf(stderr, "Error: %s\n", (char *) xframe.exception);
else
fprintf(stderr, "Unknown Error (%d) at %s:%d\n",
xframe.errcode, xframe.file, xframe.lineno);
errcode = 1;
}
XFINALLY
{
/* Ignore signals until we exit */
signal(SIGINT, SIG_IGN);
signal(SIGHUP, SIG_IGN);
signal(SIGTERM, SIG_IGN);
PQfreeCancel(cancelConn);
cancelConn = NULL;
PQfinish(conn);
}
XTRY_END;
}
/* Cleanup */
mapred_destroy_olist(&documents);
fclose(file);
return errcode;
}