bigquery_etl/format_sql/formatter.py (249 lines of code) (raw):
"""Format SQL."""
import re
from dataclasses import replace
from .tokenizer import (
AliasSeparator,
BlockEndKeyword,
BlockKeyword,
BlockStartKeyword,
BuiltInFunctionIdentifier,
CaseSubclause,
ClosingBracket,
Comment,
ExpressionSeparator,
FieldAccessOperator,
Identifier,
JinjaBlockEnd,
JinjaBlockStart,
JinjaBlockStatement,
JinjaComment,
JinjaExpression,
JinjaStatement,
LineComment,
Literal,
NewlineKeyword,
OpeningBracket,
Operator,
ReservedKeyword,
SpaceBeforeBracketKeyword,
StatementSeparator,
TopLevelKeyword,
Whitespace,
tokenize,
)
def simple_format(tokens, indent=" "):
"""Format tokens in a single pass."""
first_token = True
require_newline_before_next_token = False
allow_space_before_next_bracket = False
allow_space_before_next_token = False
prev_was_block_end = False
prev_was_jinja = False
prev_was_statement_separator = False
prev_was_unary_operator = False
next_operator_is_unary = True
indent_types = []
can_format = True
for token in tokens:
# skip original whitespace tokens, unless formatting is disabled
if isinstance(token, Whitespace):
if not can_format:
yield token
continue
# update state for current token
if isinstance(token, Comment):
# enable to disable formatting
if can_format and token.format_off:
can_format = False
elif not can_format and token.format_on:
can_format = True
elif isinstance(token, ClosingBracket):
# decrease indent to match last OpeningBracket
while indent_types and indent_types.pop() is not OpeningBracket:
pass
elif isinstance(token, TopLevelKeyword):
# decrease indent from previous TopLevelKeyword
if indent_types and indent_types[-1] is TopLevelKeyword:
indent_types.pop()
elif isinstance(token, BlockEndKeyword):
# decrease indent to match last BlockKeyword
while indent_types and indent_types.pop() is not BlockKeyword:
pass
prev_was_statement_separator = False
elif isinstance(token, JinjaBlockEnd):
# decrease indent to match last JinjaBlockStart
while indent_types and indent_types.pop() is not JinjaBlockStart:
pass
elif isinstance(token, CaseSubclause):
if token.value.upper() in ("WHEN", "ELSE"):
# Have WHEN and ELSE clauses indented one level more than CASE.
while indent_types and indent_types[-1] is CaseSubclause:
indent_types.pop()
elif isinstance(
token, (AliasSeparator, ExpressionSeparator, FieldAccessOperator)
):
if prev_was_block_end or prev_was_jinja:
require_newline_before_next_token = False
# yield whitespace
if not can_format or isinstance(token, StatementSeparator) or first_token:
# except between statements
# no new whitespace when formatting is disabled
# no space before statement separator
# no space before first token
pass
elif isinstance(token, Comment):
# blank line before comments if they start on their own line
# and come after a statement separator, and before #fail and #warn
if token.value.startswith("\n") and prev_was_statement_separator:
yield Whitespace("\n")
elif re.fullmatch(r"\s*#(fail|warn)", token.value):
yield Whitespace("\n")
elif (
require_newline_before_next_token
or isinstance(
token,
(NewlineKeyword, ClosingBracket, BlockKeyword, JinjaBlockStatement),
)
or prev_was_statement_separator
):
if prev_was_statement_separator:
yield Whitespace("\n")
yield Whitespace("\n" + indent * len(indent_types))
elif (
allow_space_before_next_token
and (
allow_space_before_next_bracket or not isinstance(token, OpeningBracket)
)
and not isinstance(token, (FieldAccessOperator, ExpressionSeparator))
and not (
prev_was_unary_operator and isinstance(token, (Literal, Identifier))
)
):
yield Whitespace(" ")
if can_format:
# uppercase keywords and replace contained whitespace with single spaces
if isinstance(token, ReservedKeyword):
token = replace(token, value=re.sub(r"\s+", " ", token.value.upper()))
# uppercase built-in function names
elif isinstance(token, BuiltInFunctionIdentifier):
token = replace(token, value=token.value.upper())
yield token
# update state for next token
require_newline_before_next_token = isinstance(
token,
(
Comment,
BlockKeyword,
TopLevelKeyword,
OpeningBracket,
ExpressionSeparator,
StatementSeparator,
JinjaStatement,
),
)
allow_space_before_next_token = not isinstance(token, FieldAccessOperator)
prev_was_block_end = isinstance(token, BlockEndKeyword)
prev_was_statement_separator = isinstance(token, StatementSeparator)
prev_was_unary_operator = next_operator_is_unary and isinstance(token, Operator)
prev_was_jinja = isinstance(
token, (JinjaExpression, JinjaComment, JinjaStatement)
)
if not isinstance(token, Comment):
# format next operator as unary if there is no preceding argument
next_operator_is_unary = not isinstance(
token, (Literal, Identifier, ClosingBracket)
)
allow_space_before_next_bracket = isinstance(
token, (SpaceBeforeBracketKeyword, Operator)
)
if isinstance(token, TopLevelKeyword) and token.value == "WITH":
# don't indent CTE's and don't put the first one on a new line
require_newline_before_next_token = False
elif isinstance(token, BlockStartKeyword):
# increase indent
indent_types.append(BlockKeyword)
elif isinstance(token, JinjaBlockStart):
# increase indent
indent_types.append(JinjaBlockStart)
elif isinstance(token, (TopLevelKeyword, OpeningBracket, CaseSubclause)):
# increase indent
indent_types.append(type(token))
elif isinstance(token, StatementSeparator):
# decrease for previous top level keyword
if indent_types and indent_types[-1] is TopLevelKeyword:
indent_types.pop()
first_token = False
class Line:
"""Container for a line of tokens."""
def __init__(self, indent_token=None, can_format=True):
"""Initialize."""
self.indent_token = indent_token
self.can_format = can_format and not isinstance(indent_token, Comment)
if indent_token is None:
self.indent_level = 0
else:
self.indent_level = len(indent_token.value)
if indent_token.value.startswith("\n"):
self.indent_level -= 1
self.inline_tokens = []
self.inline_length = 0
def add(self, token):
"""Add a token to this line."""
self.inline_length += len(token.value)
self.inline_tokens.append(token)
@property
def tokens(self):
"""Get a list of all the tokens in this line."""
if self.indent_token is None:
return self.inline_tokens
else:
return [self.indent_token] + self.inline_tokens
@property
def can_start_inline_block(self):
"""Determine if this line starts a bracket block that may be inlined.
This line starts a bracket block if it ends with an OpeningBracket.
This line can't be inlined if formatting is disabled.
Blocks preceded by an alias, such as common table expressions and
window expressions, should not be inlined.
"""
return (
self.can_format
and self.ends_with_opening_bracket
and not (
len(self.inline_tokens) > 2
and isinstance(self.inline_tokens[-3], AliasSeparator)
)
)
@property
def ends_with_opening_bracket(self):
"""Determine if this line ends with an OpeningBracket."""
return self.inline_tokens and isinstance(self.inline_tokens[-1], OpeningBracket)
@property
def starts_with_closing_bracket(self):
"""Determine if this line starts with a ClosingBracket."""
return self.inline_tokens and isinstance(self.inline_tokens[0], ClosingBracket)
@property
def ends_with_line_comment(self):
"""Determine if this line ends with a line comment."""
return self.inline_tokens and isinstance(self.inline_tokens[-1], LineComment)
def inline_block_format(tokens, max_line_length=100):
"""Extend simple_format to inline each bracket block if possible.
A bracket block is a series of tokens from an opening bracket to the
matching closing bracket. To inline a block means to put it on a single
line, instead of multiple lines.
Inline a block if the result would be shorter than max_line_length.
Do not inline if block if contains a comment.
For example, this formatter may convert:
IF(
condition,
value_if_true,
value_if_false,
)
to
IF(condition, value_if_true, value_if_false)
Implementation requires simple_format to put opening brackets at the end of
the line and closing brackets at the beginning of the line, unless there is
a comment between them.
"""
# format tokens using simple_format, then group into lines
lines = [Line()]
can_format = True
for token in simple_format(tokens):
if token.value.startswith("\n"):
lines.append(Line(token, can_format))
else:
lines[-1].add(token)
if isinstance(token, Comment):
if can_format and token.format_off:
# disable formatting for current and following lines
lines[-1].can_format = False
can_format = False
elif not can_format and token.format_on:
# enable formatting for following lines
can_format = True
# combine all lines in each bracket block that fits in max_line_length
skip_lines = 0
for index, line in enumerate(lines):
if skip_lines > 0:
skip_lines -= 1
continue
yield from line.tokens
if line.can_start_inline_block:
indent_level = line.indent_level
line_length = indent_level + line.inline_length
pending_lines = 0
pending = []
open_brackets = 1
index += 1 # start on the next line
previous_line = line
for line in lines[index:]:
if not line.can_format:
break
# Line comments can't be moved into the middle of a line.
if previous_line.ends_with_line_comment:
break
if (
not previous_line.ends_with_opening_bracket
and not line.starts_with_closing_bracket
):
pending.append(Whitespace(" "))
line_length += 1
pending_lines += 1
pending.extend(line.inline_tokens)
line_length += line.inline_length
if line_length > max_line_length:
break
if line.starts_with_closing_bracket:
open_brackets -= 1
if open_brackets == 0:
# flush pending and handle next block if present
yield from pending
skip_lines += pending_lines
if line.can_start_inline_block:
pending_lines = 0
pending = []
else:
break
if line.ends_with_opening_bracket:
open_brackets += 1
previous_line = line
def reformat(query, format_=inline_block_format, trailing_newline=False):
"""Reformat query and return as a string."""
tokens = format_(tokenize(query))
return "".join(token.value for token in tokens) + ("\n" if trailing_newline else "")