iact3/termial_print.py (223 lines of code) (raw):
import asyncio
import logging
import json
import tabulate
import textwrap
from reprint import output
from iact3.logger import PrintMsg
from iact3.stack import Stacker
LOG = logging.getLogger(__name__)
class TerminalPrinter:
def __init__(self, minimalist=False):
self.minimalist = minimalist
if not minimalist:
self._buffer_type = "list"
self.buffer = self._add_buffer()
def _add_buffer(self):
with output(output_type=self._buffer_type) as output_buffer:
return output_buffer
async def report_test_progress(self, stacker: Stacker, poll_interval=10):
if self.minimalist:
await self.minimalist_progress(stacker, poll_interval)
return
_status_dict = stacker.status()
while self._is_test_in_progress(_status_dict):
for stack in stacker.stacks:
self._print_stack_tree(stack, buffer=self.buffer)
await asyncio.sleep(poll_interval)
self.buffer.clear()
_status_dict = stacker.status()
self._display_final_status(stacker)
async def minimalist_progress(self, stacker: Stacker, poll_interval):
_status_dict = stacker.status()
history: dict = {}
while self._is_test_in_progress(_status_dict):
_status_dict = stacker.status()
for stack in stacker.stacks:
self._print_tree_minimal(stack, history)
await asyncio.sleep(poll_interval)
@staticmethod
def _print_tree_minimal(stack, history):
if stack.id not in history:
history[stack.id] = ""
if history[stack.id] != stack.status:
history[stack.id] = stack.status
msg = f"{stack.test_name} {stack.region} {stack.status}"
if "FAILED" in stack.status:
LOG.error(msg)
for event in stack.error_events(refresh=True):
LOG.error(f" {event.logical_id} {event.status_reason}")
else:
LOG.info(msg)
@staticmethod
def _print_stack_tree(stack, buffer):
padding_1 = " "
buffer.append("{}{}stack {} {}".format(padding_1, "\u250f ", "\u24c2", stack.name))
buffer.append("{}{} region: {}".format(padding_1, "\u2523", stack.region))
buffer.append("{}{} id: {}".format(padding_1, "\u2523", stack.id or ''))
buffer.append(
"{}{}status: {}{}{}".format(
padding_1, "\u2517 ", PrintMsg.white, stack.status, PrintMsg.rst_color
)
)
@staticmethod
def _display_final_status(stacker):
for final_stack in stacker.stacks:
LOG.info("{}stack {} {}".format("\u250f ", "\u24c2", final_stack.name))
LOG.info("{} region: {}".format("\u2523", final_stack.region))
LOG.info("{} id: {}".format("\u2523", final_stack.id or ''))
LOG.info(
"{}status: {}{} {}".format(
"\u2517 ", PrintMsg.white, final_stack.status, PrintMsg.rst_color
)
)
@staticmethod
def _display_price(stacker):
def _format_association_price(price_dict: dict, result: list, association_product: str):
if price_dict:
for k, v in price_dict.items():
association_prefix = association_product
if isinstance(v, dict) and "Result" in v:
association_prefix = v["Type"][v["Type"].index("::")+2:] if "Type" in v else association_product
try:
association_price = {
"Type": f'{association_product}-{k}' if not "Type" in v else v["Type"],
"ChargeType": v["Result"]["OrderSupplement"]["ChargeType"],
"PeriodUnit": v["Result"]["OrderSupplement"]["PriceUnit"],
"Quantity": v["Result"]["OrderSupplement"]["Quantity"],
"Currency": v["Result"]["Order"]["Currency"],
"OriginalAmount": v["Result"]["Order"]["OriginalAmount"] if "OriginalAmount" in v["Result"]["Order"] else None,
"DiscountAmount": v["Result"]["Order"]["DiscountAmount"] if "DiscountAmount" in v["Result"]["Order"] else None,
"TradeAmount": v["Result"]["Order"]["TradeAmount"],
}
result.append(association_price)
except Exception:
pass
if isinstance(v, dict):
_format_association_price(v, result, association_prefix)
for stack in stacker.stacks:
test_name = f' test_name: {stack.test_name} '
line_width_default = 140
if stack.template_price:
price_detail = []
for k,v in stack.template_price.items():
try:
resource_price = {
"Resource": k,
"Region": stack.region,
"Type": v["Type"],
"ChargeType": v["Result"]["OrderSupplement"]['ChargeType'],
"PeriodUnit": v["Result"]["OrderSupplement"]['PriceUnit'],
"Quantity": v["Result"]["OrderSupplement"]['Quantity'],
"Currency": v["Result"]["Order"]["Currency"],
"OriginalAmount": v["Result"]["Order"]["OriginalAmount"] if "OriginalAmount" in v["Result"]["Order"] else None,
"DiscountAmount": v["Result"]["Order"]["DiscountAmount"] if "DiscountAmount" in v["Result"]["Order"] else None,
"TradeAmount": v["Result"]["Order"]["TradeAmount"]
}
price_detail.append(resource_price)
except Exception:
resource_price = {
"Resource": k,
"Region": stack.region,
"Type": v["Type"],
"ChargeType": None,
"PeriodUnit": None,
"Quantity": None,
"Currency": None,
"OriginalAmount": None,
"DiscountAmount": None,
"TradeAmount": None
}
price_detail.append(resource_price)
pass
_format_association_price(v["Result"],price_detail,v["Type"][v["Type"].index("::")+2:])
tab = tabulate.tabulate(price_detail, headers="keys")
tab_lines = tab.splitlines()
tab_width = len(tab_lines[1])
test_name = test_name.ljust(int(tab_width/2) + int(len(test_name)/2) + 1, "\u2501")
test_name = test_name.rjust(tab_width + 2, "\u2501")
LOG.info("{}{}{}{}{} ".format("\u250f", PrintMsg.blod,
test_name, "\u2513", PrintMsg.rst_color))
for i, line in enumerate(tab_lines):
LOG.info("{} {} {}".format("\u2523" if i != len(tab_lines)-1 else "\u2517",
line.ljust(tab_width," "),
"\u252B" if i != len(tab_lines)-1 else "\u251B"))
LOG.info("\n")
if not stack.template_price:
test_name = test_name.ljust(int(line_width_default/2)+int(len(test_name)/2)-1, "\u2501")
test_name = test_name.rjust(line_width_default-1 , "\u2501")
LOG.info("{}{}{}{}{} ".format("\u250f", PrintMsg.blod,
test_name, "\u2513", PrintMsg.rst_color))
LOG.info(
"{} status: {}{}{} ".format(
"\u2523", PrintMsg.text_red_background_write,
(stack.status + PrintMsg.rst_color).ljust(line_width_default-len(" status: ")+len(PrintMsg.rst_color)-1,' '),
"\u252B"
))
subsequent_indent = ' ' * 28
status_reason = textwrap.fill(stack.status_reason, width=line_width_default-16, break_long_words=False, replace_whitespace=True, subsequent_indent=subsequent_indent)
status_reason = PrintMsg.text_red_background_write + status_reason.replace('\n', f'{PrintMsg.rst_color}\n{PrintMsg.text_red_background_write}').replace(subsequent_indent,f'{PrintMsg.rst_color}{subsequent_indent}{PrintMsg.text_red_background_write}') + PrintMsg.rst_color
LOG.info("{} status reason: {} {}\n".format("\u2517", status_reason, PrintMsg.rst_color))
@staticmethod
def _display_validation(template_validation: dict):
result_json = {
"validate_result": template_validation["Code"] if "Code" in template_validation else 'LegalTemplate',
"result_reason": template_validation["Message"] if "Message" in template_validation else 'Check passed'
}
tab = tabulate.tabulate([result_json], headers="keys")
tab_lines = tab.splitlines()
for i, line in enumerate(tab_lines):
if i >= 2 and result_json['validate_result'] != 'LegalTemplate':
LOG.error(f'{PrintMsg.text_red_background_write}{line}{PrintMsg.rst_color}')
else:
LOG.info(line)
@staticmethod
def _display_preview_resources(stacker):
line_width_default = 90
for stack in stacker.stacks:
test_name = f' test_name: {stack.test_name} '
if stack.preview_result:
resources_details = []
for r in stack.preview_result:
resources_json = {
"LogicalResourceId": r["LogicalResourceId"],
"ResourceType": r["ResourceType"][r["ResourceType"].index("::")+2:],
}
properties_str = json.dumps(r["Properties"], sort_keys=True, indent=4, separators=(',', ': '), ensure_ascii=False)
resources_json["Properties"] = properties_str
resources_details.append(resources_json)
tab = tabulate.tabulate(resources_details, headers="keys")
tab_lines = tab.splitlines()
tab_width = len(tab_lines[1])
test_name = test_name.ljust(int(tab_width/2) + int(len(test_name)/2) + 1, "\u2501")
test_name = test_name.rjust(tab_width + 2, "\u2501")
LOG.info(f'{PrintMsg.left_top}{PrintMsg.blod}{test_name}{PrintMsg.right_top}{PrintMsg.rst_color}')
LOG.info(f'{PrintMsg.left} region: {stack.region.ljust(tab_width-len("region: ")," ")} {PrintMsg.right}')
for i, line in enumerate(tab_lines):
LOG.info(f'{PrintMsg.left if i != len(tab_lines)-1 else PrintMsg.left_bottom} {line.ljust(tab_width," ")} {PrintMsg.right if i != len(tab_lines)-1 else PrintMsg.right_bottom}')
else:
test_name = test_name.ljust(int(line_width_default/2)+int(len(test_name)/2)-1, PrintMsg.top)
test_name = test_name.rjust(line_width_default-1 , PrintMsg.top)
LOG.info(f'{PrintMsg.left_top}{PrintMsg.blod}{test_name}{PrintMsg.right_top}{PrintMsg.rst_color}')
LOG.info(f'{PrintMsg.left} region: {stack.region.ljust(line_width_default-len(" region: ")-1," ")}{PrintMsg.right}')
LOG.info(
"{} status: {}{}{} ".format(
PrintMsg.left, PrintMsg.text_red_background_write,
(stack.status + PrintMsg.rst_color).ljust(line_width_default-len(" status: ")+len(PrintMsg.rst_color)-1,' '),
PrintMsg.right
))
subsequent_indent = ' ' * 28
status_reason = textwrap.fill(stack.status_reason, width=line_width_default-16, break_long_words=False, replace_whitespace=True, subsequent_indent=subsequent_indent)
status_reason = PrintMsg.text_red_background_write + status_reason.replace('\n', f'{PrintMsg.rst_color}\n{PrintMsg.text_red_background_write}').replace(subsequent_indent,f'{PrintMsg.rst_color}{subsequent_indent}{PrintMsg.text_red_background_write}') + PrintMsg.rst_color
LOG.info("{} status reason: {} {}\n".format(PrintMsg.left_bottom, status_reason, PrintMsg.rst_color))
@staticmethod
def _display_policies(policies: dict):
LOG.info(json.dumps(policies, sort_keys=True, indent=4, separators=(',', ': '), ensure_ascii=False))
@staticmethod
def _is_test_in_progress(status_dict, status_condition="IN_PROGRESS"):
if not status_dict:
return False
if status_dict.get(status_condition):
return True
else:
return False