in src/route_connection.c [30:108]
bool route_client_connection(PgSocket *client, PktHdr *pkt) {
SBuf *sbuf = &client->sbuf;
char *pkt_start;
char *query_str;
char *dbname;
PgDatabase *db;
PgPool *pool;
/* extract query string from packet */
/* first byte is the packet type (which we already know)
* next 4 bytes is the packet length
* For packet type 'Q', the query string is next
* 'Q' | int32 len | str query
* For packet type 'P', the query string is after the stmt string
* 'P' | int32 len | str stmt | str query | int16 numparams | int32 paramoid
* (Ref: https://www.pgcon.org/2014/schedule/attachments/330_postgres-for-the-wire.pdf)
*/
pkt_start = (char *) &sbuf->io->buf[sbuf->io->parse_pos];
/* printHex(pkt_start, pkt->len); */
if (pkt->type == 'Q') {
query_str = (char *) pkt_start + 5;
} else if (pkt->type == 'P') {
char *stmt_str = pkt_start + 5;
query_str = stmt_str + strlen(stmt_str) + 1;
} else {
fatal("Invalid packet type - expected Q or P, got %c", pkt->type);
}
slog_debug(client, "route_client_connection: Username => %s", client->login_user->name);
slog_debug(client, "route_client_connection: Query => %s", query_str);
if (strcmp(cf_routing_rules_py_module_file, "not_enabled") == 0) {
slog_debug(client,
"Query routing not enabled in config (routing_rules_py_module_file)");
return true;
}
dbname = pycall(client, client->login_user->name, query_str, cf_routing_rules_py_module_file,
"routing_rules");
if (dbname == NULL) {
slog_debug(client, "routing_rules returned 'None' - existing connection preserved");
return false;
}
db = find_database(dbname);
if (db == NULL) {
slog_error(client,
"nonexistant database key <%s> returned by routing_rules",
dbname);
slog_error(client, "check ini and/or routing rules function");
free(dbname);
return false;
}
pool = get_pool(db, client->login_user);
if (client->pool != pool) {
if (client->link != NULL) {
/* release existing server connection back to pool */
slog_debug(client, "releasing existing server connection");
release_server(client->link);
client->link = NULL;
}
/* assign client to new pool */
slog_debug(client,
"assigning client to connection pool for database <%s>",
dbname);
/* Move the client over to the other pool for correct pool stats - needed for pool size maint */
statlist_remove(&client->pool->active_client_list, &client->head);
statlist_append(&pool->active_client_list, &client->head);
client->pool = pool;
} else {
slog_debug(client, "already connected to pool <%s>", dbname);
}
free(dbname);
return true;
}