plugins/mkdocs-atlas-formatting-plugin/mkdocs_atlas_formatting_plugin/block.py (172 lines of code) (raw):
import re
from typing import Dict, List, Optional, Tuple, Union
from .logconfig import setup_logging
from .atlaswebserver import AtlasWebServer, NoopWebserver
logger = setup_logging(__name__)
class Block:
ASL_BASE_URI = 'https://netflix.github.io/atlas-docs/asl/ref'
ATLAS_EXAMPLE = 'atlas-example'
ATLAS_GRAPH = 'atlas-graph'
ATLAS_SIGNATURE = 'atlas-signature'
ATLAS_STACKLANG = 'atlas-stacklang'
ATLAS_URI = 'atlas-uri'
valid_types = [
ATLAS_EXAMPLE,
ATLAS_GRAPH,
ATLAS_SIGNATURE,
ATLAS_STACKLANG,
ATLAS_URI
]
example_pattern = re.compile(r'([^:]+): (.+)')
options_pattern = re.compile(r'.+@@@ ([a-z\\-]+)(?:$| { (.+) })')
query_pattern = re.compile(r'.*q=([^&]+)')
number_pattern = re.compile(r'^[-+]?\d+(.\d)?$')
color_pattern = re.compile(r'^1(,1)+$')
def __init__(self, webserver: Optional[NoopWebserver] = None) -> None:
self.webserver: AtlasWebServer = AtlasWebServer() if not webserver else webserver
self.type: Optional[str] = None
self.options: Optional[Dict[str, str]] = None
self.is_started: bool = False
self.input_lines: Optional[Union[List[str], List[Tuple[str, str]]]] = None
self.output_lines: Optional[List[str]] = None
def start(self, page_title: str, line: str) -> None:
m = self.options_pattern.match(line)
if not m:
return
block_type = m.group(1)
block_options = m.group(2)
if block_type in self.valid_types:
self.type = block_type
self.parse_options(block_options)
self.is_started = True
else:
logger.warning(f'invalid block type `{block_type}` on page `{page_title}`')
def parse_options(self, options: Optional[str]) -> None:
if options:
res = {}
for option in options.split(' '):
if '=' not in option or len(option.split('=')) > 2:
continue
k, v = option.split('=')
# allow for & and = characters in option values
v = v.replace('&', '&').replace('%3D', '=').replace('%3d', '=')
res[k] = v
if len(res) > 0:
self.options = res
def complete(self) -> None:
self.type = None
self.options = None
self.is_started = False
self.input_lines = None
self.output_lines = None
def add_line(self, line: str) -> None:
if not self.is_started:
return
if self.type == self.ATLAS_EXAMPLE:
m = self.example_pattern.match(line)
if m:
line = m.groups()
if self.input_lines is None:
self.input_lines = [line]
else:
self.input_lines.append(line)
def mk_image_tag(self, uri: str) -> str:
"""
Given an Atlas URI, fetch the image from a running Atlas Standalone server and format
the output as an image tag, suitable for embedding in an HTML page.
Input:
/api/v1/graph?w=200&h=125&no_legend=1&s=e-3h&e=2012-01-01T07:00&tz=UTC&l=0&q=nf.app,alerttest,:eq,name,ssCpuUser,:eq,:and,:sum,80,:gt
Output:
<img src="data:image/png;base64,...encoded image..." width="286" height="153">
"""
data_uri, width, height = self.webserver.get_image(f'{uri}&features=unstable')
return f'<img src="{data_uri}" width="{width}" height="{height}"/>'
def mk_asl_link(self, op: str) -> str:
stripped_op = op.replace(":", "")
return f'<a href="{self.ASL_BASE_URI}/{stripped_op}/">{op}</a>'
def fmt_atlas_expr(self, uri: str, offset: int = 0) -> str:
"""
Given an Atlas URI, extract the query and convert it into a pre-formatted block.
There should be line breaks after each operator and each operator should link to the
Atlas Stack Language Reference. The output is intended to be wrapped in pre tags by
the caller.
Input:
/api/v1/graph?w=200&h=150&no_legend=1&s=e-3h&e=2012-01-01T07:00&tz=UTC&l=0&q=nf.app,alerttest,:eq,name,ssCpuUser,:eq,:and,:sum,80,:gt,5,:rolling-count
Output:
nf.app,alerttest,<a href="https://netflix.github.io/atlas-docs/asl/ref/eq/">:eq</a>,
name,ssCpuUser,<a href="https://netflix.github.io/atlas-docs/asl/ref/eq/">:eq</a>,
<a href="https://netflix.github.io/atlas-docs/asl/ref/and/">:and</a>,
<a href="https://netflix.github.io/atlas-docs/asl/ref/sum/">:sum</a>,
80,<a href="https://netflix.github.io/atlas-docs/asl/ref/gt/">:gt</a>,
5,<a href="https://netflix.github.io/atlas-docs/asl/ref/rolling-count/">:rolling-count</a>
"""
m = self.query_pattern.match(uri)
if not m:
return 'ERROR: query not found'
line = ''
pad = ' ' * offset if offset > 0 else ''
output_lines = []
# Short circuit for examples with a constant for the query or the color queries
if self.number_pattern.match(m.group(1)) or self.color_pattern.match(m.group(1)):
return f'{pad}{line}{m.group(1)}\n'
for item in m.group(1).split(','):
if item.startswith(':'):
output_lines.append(f'{pad}{line}{self.mk_asl_link(item)},')
line = ''
else:
line += f'{item},'
if line:
output_lines.append(f'{pad}{line}')
output_lines[-1] = output_lines[-1][:-1] # strip trailing comma
return '\n'.join(output_lines) + '\n'
@staticmethod
def mk_stack_table(label: str, data: List[str]) -> str:
rows = ''.join([f'<tr><td>{item}</td></tr>' for item in data])
return f'<strong>{label} Stack:</strong><table><tbody>{rows}</tbody></table>'
@staticmethod
def mk_table_row(data: List[str]) -> str:
output = ''.join([f'<td>{item}</td>' for item in data])
return f'<tr>{output}</tr>'
@staticmethod
def hilite(input_line: str, expr: str) -> str:
return input_line.replace(expr, f'<span class="atlas-hilite">{expr}</span>')
def build_output(self) -> None:
method_name = 'build_' + self.type.replace('-', '_')
try:
getattr(self, method_name)()
except AttributeError:
logger.error(f'{method_name} method is missing')
def build_atlas_example(self) -> None:
titles, graphs, exprs = [], [], []
for title, uri in self.input_lines:
titles.append(title)
graphs.append(self.mk_image_tag(uri))
expr = self.fmt_atlas_expr(uri)
if self.options and 'hilite' in self.options:
expr = self.hilite(expr, self.options['hilite'])
exprs.append(f'<pre>{expr}</pre>')
self.output_lines = ['<table><tbody>']
self.output_lines.append(self.mk_table_row(titles))
self.output_lines.append(self.mk_table_row(graphs))
self.output_lines.append(self.mk_table_row(exprs))
self.output_lines.append('</tbody></table>')
def build_atlas_graph(self) -> None:
uri = self.input_lines[0]
self.output_lines = [f'<p>{self.mk_image_tag(uri)}</p>']
if self.options['show-expr'] == 'true':
self.output_lines.append(f'<pre>{self.fmt_atlas_expr(uri)}</pre>')
def build_atlas_signature(self) -> None:
input_stack = []
output_stack = []
is_input = True
for line in self.input_lines:
if line.startswith('-->'):
is_input = False
elif is_input:
input_stack.append(line)
else:
output_stack.append(line)
# Make stack sizes the same by filling with spaces if needed
n = max(len(input_stack), len(output_stack))
if len(input_stack) < n:
input_stack.extend([' '] * (n - len(input_stack)))
if len(output_stack) < n:
output_stack.extend([' '] * (n - len(output_stack)))
self.output_lines = []
self.output_lines.append('<table><tbody><tr>')
self.output_lines.append(f'<td>{self.mk_stack_table("Input", input_stack)}</td>')
self.output_lines.append(f'<td style="vertical-align: middle;">⇨</td>')
self.output_lines.append(f'<td>{self.mk_stack_table("Output", output_stack)}</td>')
self.output_lines.append('</tr></tbody></table>')
def build_atlas_stacklang(self):
output = self.fmt_atlas_expr(self.input_lines[0])
if self.options and 'hilite' in self.options:
output = self.hilite(output, self.options['hilite'])
self.output_lines = [f'<pre>{output}</pre>']
def build_atlas_uri(self):
base_uri, params = self.input_lines[0].split('?')
output = f'{base_uri}?\n'
for idx, p in enumerate(params.split('&')):
pad = ' ' if idx == 0 else ' &'
if p.startswith('q='):
output += f'{pad}q=\n'
output += self.fmt_atlas_expr(p, 4)
else:
output += f'{pad}{p}\n'
if self.options and 'hilite' in self.options:
output = self.hilite(output, self.options['hilite'])
self.output_lines = [f'<pre>{output}</pre>']