resdb_orm/orm.py (53 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests
import json
import secrets
import yaml
class ResDBORM:
def __init__(self, config_path='config.yaml'):
with open(config_path, 'r') as config_file:
self.config = yaml.safe_load(config_file)
self.db_root_url = self.config['database']['db_root_url']
def generate_token(self, length=64):
"""Generate a secure random hexadecimal token."""
return secrets.token_hex(length // 2)
def create(self, data):
"""Create a new record in the DB."""
token = self.generate_token()
payload = {"id":token, "data":data}
headers = {'Content-Type': 'application/json'}
response = requests.post(f'{self.db_root_url}/v1/transactions/commit',
data=json.dumps(payload), headers=headers)
# Check if response is successful and handle empty response content
if response.status_code == 201:
if response.content:
decoded_content = response.content.decode('utf-8')
id_value = decoded_content.split(': ')[1].strip()
return id_value
else:
return {"status": "create unsuccessful, no content in response"}
def read_all(self):
"""Read all records from the DB."""
response = requests.get(f'{self.db_root_url}/v1/transactions')
return response.json()
def read(self, key):
"""Read a specific record by key from the DB."""
response = requests.get(f'{self.db_root_url}/v1/transactions/{key}')
return response.json()
def delete(self, key):
"""Delete a specific record by key in the DB."""
payload = {"id": key}
headers = {'Content-Type': 'application/json'}
response = requests.post(f'{self.db_root_url}/v1/transactions/commit',
data=json.dumps(payload), headers=headers)
# Check if response is successful and handle empty response content
if response.status_code == 201:
if response.content:
return {"status": "delete successful"}
else:
return {"status": "delete unsuccessful, no content in response"}
def update(self, key, new_data):
"""Update a specific record by key in the DB."""
# Delete the existing record first
delete_response = self.delete(key)
# Handle the response accordingly
if "status" in delete_response and "no content in response" in delete_response["status"]:
print("Warning: Delete operation returned no content.")
# Update by creating a new entry with the same key
payload = {"id": key, "data": new_data}
headers = {'Content-Type': 'application/json'}
response = requests.post(f'{self.db_root_url}/v1/transactions/commit',
data=json.dumps(payload), headers=headers)
# Check if response is successful and handle empty response content
if response.status_code == 201:
if response.content:
return {"status": "update successful"}
else:
return {"status": "update unsuccessful, no content in response"}