in holo-client-c/src/worker.c [271:399]
ActionStatus get_table_schema(ConnectionHolder* connHolder, HoloTableSchema* schema, HoloTableName tableName, char** errMsgAddr) {
HoloColumn* columns = NULL;
const char* findTableOidSql = "SELECT property_value FROM hologres.hg_table_properties WHERE table_namespace = $1 AND table_name = $2 AND property_key = 'table_id'";
const char* findColumnsSql = "WITH c AS (SELECT column_name, ordinal_position, is_nullable, column_default FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2), a AS (SELECT attname, atttypid from pg_catalog.pg_attribute WHERE attrelid = $3::regclass::oid) SELECT * FROM c LEFT JOIN a ON c.column_name = a.attname;";
const char* findPrimaryKeysSql = "SELECT c.column_name, cc.ordinal_position FROM information_schema.key_column_usage AS c LEFT JOIN information_schema.table_constraints AS t ON t.constraint_name = c.constraint_name AND c.table_schema = t.table_schema AND c.table_name = t.table_name LEFT JOIN information_schema.columns cc ON c.table_schema = cc.table_schema AND c.table_name = cc.table_name AND c.column_name = cc.column_name WHERE t.table_schema = $1 AND t.table_name = $2 AND t.constraint_type = 'PRIMARY KEY'";
const char* findDistributionKeysSql = "WITH d AS (SELECT table_namespace, table_name, unnest(string_to_array(property_value, ',')) as column_name from hologres.hg_table_properties WHERE table_namespace = $1 AND table_name = $2 AND property_key = 'distribution_key') SELECT c.column_name, c.ordinal_position FROM d LEFT JOIN information_schema.columns c ON d.table_namespace = c.table_schema AND d.table_name=c.table_name AND d.column_name = c.column_name";
const char* findPartitionColumnSql = "SELECT partattrs FROM pg_partitioned_table WHERE partrelid = $1::regclass::oid";
PGresult* res = NULL;
int nTuples, i, pos = 0;
char oid[11];
// use prepared statement, so there's no need to quote_literal_cstr() before use
const char* name[1] = {tableName.fullName};
const char* names[2] = {tableName.schemaName, tableName.tableName};
const char* names3[3] = {tableName.schemaName, tableName.tableName, tableName.fullName};
//get table oid
res = connection_holder_exec_params_with_retry(connHolder, findTableOidSql, 2, NULL, names, NULL, NULL, 0, errMsgAddr);
if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) == 0){
LOG_ERROR("Get table Oid of table %s failed.", tableName.fullName);
holo_client_destroy_tableschema(schema);
if (res != NULL) PQclear(res);
return FAILURE_NOT_NEED_RETRY;
} else {
schema->tableId = atoi(PQgetvalue(res, 0, 0));
}
if (res != NULL) PQclear(res);
//get column_name, data_type_oid, is_nullable, default_value of each column
sprintf(oid, "%d", schema->tableId);
res = connection_holder_exec_params_with_retry(connHolder, findColumnsSql, 3, NULL, names3, NULL, NULL, 0, errMsgAddr);
if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) == 0){
LOG_ERROR("Get column info of table %s failed.", tableName.fullName);
if (res != NULL) PQclear(res);
return FAILURE_NOT_NEED_RETRY;
} else {
nTuples = PQntuples(res);
schema->nColumns = nTuples;
columns = holo_client_new_columns(nTuples);
for (i = 0; i < nTuples; i++) {
pos = atoi(PQgetvalue(res, i, 1)) - 1;
if (pos >= 0 && pos < nTuples) {
columns[pos].name = deep_copy_string(PQgetvalue(res, i, 0));
columns[pos].quoted = quote_identifier(columns[pos].name);
columns[pos].type = atoi(PQgetvalue(res, i, 5));
if (strcmp(PQgetvalue(res, i, 2), "YES") == 0) {
columns[pos].nullable = true;
} else {
columns[pos].nullable = false;
}
columns[pos].isPrimaryKey = false;
if (PQgetisnull(res, i, 3)) {
columns[pos].defaultValue = NULL;
} else {
columns[pos].defaultValue = deep_copy_string(PQgetvalue(res, i, 3));
}
}
}
schema->columns = columns;
}
if (res != NULL) PQclear(res);
//find primary keys
res = connection_holder_exec_params_with_retry(connHolder, findPrimaryKeysSql, 2, NULL, names, NULL, NULL, 0, errMsgAddr);
if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK){
LOG_ERROR("Get primary keys info of table %s failed.", tableName.fullName);
holo_client_destroy_tableschema(schema);
if (res != NULL) PQclear(res);
return FAILURE_NOT_NEED_RETRY;
} else {
nTuples = PQntuples(res);
schema->nPrimaryKeys = nTuples;
FREE(schema->primaryKeys);
if (nTuples > 0){
schema->primaryKeys = MALLOC(nTuples, int);
}
for (i = 0; i < nTuples; i++) {
pos = atoi(PQgetvalue(res, i, 1)) - 1;
if (pos >= 0 && pos < schema->nColumns) {
columns[pos].isPrimaryKey = true;
schema->primaryKeys[i] = pos;
}
}
}
if (res != NULL) PQclear(res);
//find distribution keys
res = connection_holder_exec_params_with_retry(connHolder, findDistributionKeysSql, 2, NULL, names, NULL, NULL, 0, errMsgAddr);
if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK){
LOG_ERROR("Get distribution keys info of table %s failed.", tableName.fullName);
holo_client_destroy_tableschema(schema);
if (res != NULL) PQclear(res);
return FAILURE_NOT_NEED_RETRY;
} else {
nTuples = PQntuples(res);
schema->nDistributionKeys = nTuples;
if (nTuples > 0) {
schema->distributionKeys = MALLOC(nTuples, int);
}
for (i = 0; i < nTuples; i++) {
pos = atoi(PQgetvalue(res, i, 1)) - 1;
if (pos >= 0 && pos < schema->nColumns) {
schema->distributionKeys[i] = pos;
}
}
}
if (res != NULL) PQclear(res);
//find partition column
res = connection_holder_exec_params_with_retry(connHolder, findPartitionColumnSql, 1, NULL, name, NULL, NULL, 0, errMsgAddr);
if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK){
LOG_ERROR("Get partition column of table %s failed.", tableName.fullName);
holo_client_destroy_tableschema(schema);
if (res != NULL) PQclear(res);
return FAILURE_NOT_NEED_RETRY;
} else if (PQntuples(res) == 0) {
schema->partitionColumn = -1;
}
else {
schema->partitionColumn = atoi(PQgetvalue(res, 0, 0)) - 1;
}
if (res != NULL) PQclear(res);
//deep copy table name
schema->tableName->fullName = deep_copy_string(tableName.fullName);
schema->tableName->schemaName = deep_copy_string(tableName.schemaName);
schema->tableName->tableName = deep_copy_string(tableName.tableName);
return SUCCESS;
}