actions/devel/psql/command/psql.py (44 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import psycopg import common.util as ut from common.command_data import CommandData from psycopg.rows import dict_row import json class Psql(): """ Implementation of a Postgres Command executor. It will require a user_data dictionary linked to a specific user. """ def __init__(self, user_data): self._user_data = user_data self._postgres_url= ut.get_env_value(user_data,"POSTGRES_URL") self.validate() def validate(self): """ Validate that the provided user_data contains the appropriate metadata for being able to submit a postgres command. """ if not self._postgres_url: raise Exception("user does not have valid POSTGRES environment set") def _query(self, input:CommandData): """ Queries for matching query records and returns datas in a key, value format. """ query = input.command() with psycopg.connect(self._postgres_url) as conn: # Open a cursor to perform database operations with conn.cursor(row_factory=dict_row) as cur: cur.execute(query) result = cur.fetchall() input.result(json.dumps(result)) input.status(200) return input def _script(self, input:CommandData): script = input.command() with psycopg.connect(self._postgres_url) as conn: # Open a cursor to perform database operations with conn.cursor() as cur: cur.execute(script) conn.commit() input.result(cur.statusmessage) input.status(200) return input def _is_a_query(self, input:CommandData): return 'select' in input.command().lower() def execute(self, input:CommandData): print(f"**** Psql command to execute {input.command()}") try: if self._is_a_query(input): return self._query(input) else: return self._script(input) except Exception as e: input.result(f"could not execute psql command {e}") input.status(400) return input