bool route_client_connection()

in src/route_connection.c [30:108]


bool route_client_connection(PgSocket *client, PktHdr *pkt) {
	SBuf *sbuf = &client->sbuf;
	char *pkt_start;
	char *query_str;
	char *dbname;
	PgDatabase *db;
	PgPool *pool;

	/* extract query string from packet */
	/* first byte is the packet type (which we already know)
	 * next 4 bytes is the packet length
	 * For packet type 'Q', the query string is next
	 * 	'Q' | int32 len | str query
	 * For packet type 'P', the query string is after the stmt string
	 * 	'P' | int32 len | str stmt | str query | int16 numparams | int32 paramoid
	 * (Ref: https://www.pgcon.org/2014/schedule/attachments/330_postgres-for-the-wire.pdf)
	 */

	pkt_start = (char *) &sbuf->io->buf[sbuf->io->parse_pos];
	/* printHex(pkt_start, pkt->len); */

	if (pkt->type == 'Q') {
		query_str = (char *) pkt_start + 5;
	} else if (pkt->type == 'P') {
		char *stmt_str = pkt_start + 5;
		query_str = stmt_str + strlen(stmt_str) + 1;
	} else {
		fatal("Invalid packet type - expected Q or P, got %c", pkt->type);
	}

	slog_debug(client, "route_client_connection: Username => %s", client->login_user->name);
	slog_debug(client, "route_client_connection: Query => %s", query_str);

	if (strcmp(cf_routing_rules_py_module_file, "not_enabled") == 0) {
		slog_debug(client,
				"Query routing not enabled in config (routing_rules_py_module_file)");
		return true;
	}

	dbname = pycall(client, client->login_user->name, query_str, cf_routing_rules_py_module_file,
			"routing_rules");
	if (dbname == NULL) {
		slog_debug(client, "routing_rules returned 'None' - existing connection preserved");
		return false;
	}

	db = find_database(dbname);
	if (db == NULL) {
		slog_error(client,
				"nonexistant database key <%s> returned by routing_rules",
				dbname);
		slog_error(client, "check ini and/or routing rules function");
		free(dbname);
		return false;
	}
	pool = get_pool(db, client->login_user);
	if (client->pool != pool) {
		if (client->link != NULL) {
			/* release existing server connection back to pool */
			slog_debug(client, "releasing existing server connection");
			release_server(client->link);
			client->link = NULL;
		}
		/* assign client to new pool */
		slog_debug(client,
				"assigning client to connection pool for database <%s>",
				dbname);

                /* Move the client over to the other pool for correct pool stats - needed for pool size maint */
                statlist_remove(&client->pool->active_client_list, &client->head);
                statlist_append(&pool->active_client_list, &client->head);

		client->pool = pool;
	} else {
		slog_debug(client, "already connected to pool <%s>", dbname);
	}
	free(dbname);
	return true;
}