decisionai_plugin/common/util/azuretable.py (38 lines of code) (raw):
from azure.core.credentials import AzureNamedKeyCredential
from azure.data.tables import TableServiceClient, UpdateMode
from azure.identity import DefaultAzureCredential
from .constant import AZURE_STORAGE_ACCOUNT_USE_MI
class AzureTable():
def __init__(self, account_name, account_key=None, account_domain="core.windows.net"):
cred = DefaultAzureCredential()
if not AZURE_STORAGE_ACCOUNT_USE_MI:
cred = AzureNamedKeyCredential(account_name, account_key)
self.table_service_client = TableServiceClient(endpoint="https://{}.table.{}".format(account_name, account_domain), credential=cred)
def create_table_if_not_exists(self, table_name):
return self.table_service_client.create_table_if_not_exists(table_name)
def exists_table(self, table_name):
list_tables = self.table_service_client.list_tables()
return table_name in list_tables
def insert_or_replace_entity(self, table_name, partition_key, row_key, **kwargs):
try:
entity = self.table_service_client.get_table_client(table_name).get_entity(partition_key, row_key)
except Exception:
# Insert a new entity
entity = {'PartitionKey': partition_key, 'RowKey': row_key}
for (k,v) in kwargs.items():
entity[k] = v
return self.table_service_client.get_table_client(table_name).upsert_entity(entity, UpdateMode.REPLACE)
def insert_or_replace_entity2(self, table_name, entity):
return self.table_service_client.get_table_client(table_name).upsert_entity(entity, UpdateMode.REPLACE)
def insert_entity(self, table_name, entity):
return self.table_service_client.get_table_client(table_name).create_entity(entity)
def update_entity(self, table_name, entity):
return self.table_service_client.get_table_client(table_name).update_entity(entity)
def get_entity(self, table_name, partition_key, row_key):
return self.table_service_client.get_table_client(table_name).get_entity(partition_key, row_key)
def delete_entity(self, table_name, partition_key, row_key):
return self.table_service_client.get_table_client(table_name).delete_entity(partition_key, row_key)
def delete_table(self, table_name):
return self.table_service_client.delete_table(table_name)
def get_entities(self, table_name, partition_key):
filter = "PartitionKey eq '{0}'".format(partition_key)
return self.table_service_client.get_table_client(table_name).query_entities(filter)