in workshop/raw files/rcf-byom.py [0:0]
def run_sql(sql_text):
client = boto3.client("redshift-data")
res = client.execute_statement(Database=REDSHIFT_ENDPOINT.split('/')[1], DbUser=REDSHIFT_USER, Sql=sql_text,
ClusterIdentifier=REDSHIFT_ENDPOINT.split('.')[0])
query_id = res["Id"]
done = False
while not done:
time.sleep(1)
status_description = client.describe_statement(Id=query_id)
status = status_description["Status"]
if status == "FAILED":
raise Exception('SQL query failed:' + query_id + ": " + status_description["Error"])
elif status == "FINISHED":
if status_description['ResultRows']>0:
results = client.get_statement_result(Id=query_id)
metadata=dict()
column_labels = []
#dtypes = []
for i in range(len(results["ColumnMetadata"])): column_labels.append(results["ColumnMetadata"][i]['label'])
for i in range(len(results["ColumnMetadata"])):
if (results["ColumnMetadata"][i]['typeName'])=='varchar':
typ='str'
elif ((results["ColumnMetadata"][i]['typeName'])=='int4' or (results["ColumnMetadata"][i]['typeName'])=='numeric') :
typ='float'
else:
typ = 'str'
metadata[results["ColumnMetadata"][i]['label']]=typ
#dtypes.append(typ)
records = []
for record in results.get('Records'):
records.append([list(rec.values())[0] for rec in record])
df = pd.DataFrame(np.array(records), columns=column_labels)
df = df.astype(metadata)
return df
else:
return query_id