ActionStatus get_table_schema()

in holo-client-c/src/worker.c [271:399]


ActionStatus get_table_schema(ConnectionHolder* connHolder, HoloTableSchema* schema, HoloTableName tableName, char** errMsgAddr) {
    HoloColumn* columns = NULL;
    const char* findTableOidSql = "SELECT property_value FROM hologres.hg_table_properties WHERE table_namespace = $1 AND table_name = $2 AND property_key = 'table_id'";
    const char* findColumnsSql = "WITH c AS (SELECT column_name, ordinal_position, is_nullable, column_default FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2), a AS (SELECT attname, atttypid from pg_catalog.pg_attribute WHERE attrelid = $3::regclass::oid) SELECT * FROM c LEFT JOIN a ON c.column_name = a.attname;";
    const char* findPrimaryKeysSql = "SELECT c.column_name, cc.ordinal_position FROM information_schema.key_column_usage AS c LEFT JOIN information_schema.table_constraints AS t ON t.constraint_name = c.constraint_name AND c.table_schema = t.table_schema AND c.table_name = t.table_name LEFT JOIN information_schema.columns cc ON c.table_schema = cc.table_schema AND c.table_name = cc.table_name AND c.column_name = cc.column_name WHERE t.table_schema = $1 AND t.table_name = $2 AND t.constraint_type = 'PRIMARY KEY'";
    const char* findDistributionKeysSql = "WITH d AS (SELECT table_namespace, table_name, unnest(string_to_array(property_value, ',')) as column_name from hologres.hg_table_properties WHERE table_namespace = $1 AND table_name = $2 AND property_key = 'distribution_key') SELECT c.column_name, c.ordinal_position FROM d LEFT JOIN information_schema.columns c ON d.table_namespace = c.table_schema AND d.table_name=c.table_name AND d.column_name = c.column_name";
    const char* findPartitionColumnSql = "SELECT partattrs FROM pg_partitioned_table WHERE partrelid = $1::regclass::oid";
    PGresult* res = NULL;
    int nTuples, i, pos = 0;
    char oid[11];
    // use prepared statement, so there's no need to quote_literal_cstr() before use
    const char* name[1] = {tableName.fullName};
    const char* names[2] = {tableName.schemaName, tableName.tableName};
    const char* names3[3] = {tableName.schemaName, tableName.tableName, tableName.fullName};

    //get table oid
    res = connection_holder_exec_params_with_retry(connHolder, findTableOidSql, 2, NULL, names, NULL, NULL, 0, errMsgAddr);
    if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) == 0){
        LOG_ERROR("Get table Oid of table %s failed.", tableName.fullName);
        holo_client_destroy_tableschema(schema);
        if (res != NULL) PQclear(res);
        return FAILURE_NOT_NEED_RETRY;
    } else {
        schema->tableId = atoi(PQgetvalue(res, 0, 0));
    }
    if (res != NULL) PQclear(res);

    //get column_name, data_type_oid, is_nullable, default_value of each column
    sprintf(oid, "%d", schema->tableId);
    res = connection_holder_exec_params_with_retry(connHolder, findColumnsSql, 3, NULL, names3, NULL, NULL, 0, errMsgAddr);
    if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK  || PQntuples(res) == 0){
        LOG_ERROR("Get column info of table %s failed.", tableName.fullName);
        if (res != NULL) PQclear(res);
        return FAILURE_NOT_NEED_RETRY;
    } else {
        nTuples = PQntuples(res);
        schema->nColumns = nTuples;
        columns = holo_client_new_columns(nTuples);
        for (i = 0; i < nTuples; i++) {
            pos = atoi(PQgetvalue(res, i, 1)) - 1;
            if (pos >= 0 && pos < nTuples) {
                columns[pos].name = deep_copy_string(PQgetvalue(res, i, 0));
                columns[pos].quoted = quote_identifier(columns[pos].name);
                columns[pos].type = atoi(PQgetvalue(res, i, 5));
                if (strcmp(PQgetvalue(res, i, 2), "YES") == 0) {
                    columns[pos].nullable = true;
                } else {
                    columns[pos].nullable = false;
                }
                columns[pos].isPrimaryKey = false;
                if (PQgetisnull(res, i, 3)) {
                    columns[pos].defaultValue = NULL;
                } else {
                    columns[pos].defaultValue = deep_copy_string(PQgetvalue(res, i, 3));
                }
            }
        }
        schema->columns = columns;
    }
    if (res != NULL) PQclear(res);

    //find primary keys
    res = connection_holder_exec_params_with_retry(connHolder, findPrimaryKeysSql, 2, NULL, names, NULL, NULL, 0, errMsgAddr);
    if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK){
        LOG_ERROR("Get primary keys info of table %s failed.", tableName.fullName);
        holo_client_destroy_tableschema(schema);
        if (res != NULL) PQclear(res);
        return FAILURE_NOT_NEED_RETRY;
    } else {
        nTuples = PQntuples(res);
        schema->nPrimaryKeys = nTuples;
        FREE(schema->primaryKeys);
        if (nTuples > 0){
            schema->primaryKeys = MALLOC(nTuples, int);
        }
        for (i = 0; i < nTuples; i++) {
            pos = atoi(PQgetvalue(res, i, 1)) - 1;
            if (pos >= 0 && pos < schema->nColumns) {
                columns[pos].isPrimaryKey = true;
                schema->primaryKeys[i] = pos;
            }
        }
    }
    if (res != NULL) PQclear(res);

    //find distribution keys
    res = connection_holder_exec_params_with_retry(connHolder, findDistributionKeysSql, 2, NULL, names, NULL, NULL, 0, errMsgAddr);
    if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK){
        LOG_ERROR("Get distribution keys info of table %s failed.", tableName.fullName);
        holo_client_destroy_tableschema(schema);
        if (res != NULL) PQclear(res);
        return FAILURE_NOT_NEED_RETRY;
    } else {
        nTuples = PQntuples(res);
        schema->nDistributionKeys = nTuples;
        if (nTuples > 0) {
            schema->distributionKeys = MALLOC(nTuples,  int);
        }
        for (i = 0; i < nTuples; i++) {
            pos = atoi(PQgetvalue(res, i, 1)) - 1;
            if (pos >= 0 && pos < schema->nColumns) {
                schema->distributionKeys[i] = pos;
            }
        }
    }
   if (res != NULL) PQclear(res);

   //find partition column
    res = connection_holder_exec_params_with_retry(connHolder, findPartitionColumnSql, 1, NULL, name, NULL, NULL, 0, errMsgAddr);
    if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK){
        LOG_ERROR("Get partition column of table %s failed.", tableName.fullName);
        holo_client_destroy_tableschema(schema);
        if (res != NULL) PQclear(res);
        return FAILURE_NOT_NEED_RETRY;
    } else if (PQntuples(res) == 0) {
        schema->partitionColumn = -1;
    }
    else {
        schema->partitionColumn = atoi(PQgetvalue(res, 0, 0)) - 1;
    }
    if (res != NULL) PQclear(res);

    //deep copy table name 
    schema->tableName->fullName = deep_copy_string(tableName.fullName);
    schema->tableName->schemaName = deep_copy_string(tableName.schemaName);
    schema->tableName->tableName = deep_copy_string(tableName.tableName);

    return SUCCESS;
}