in ossdbtoolsservice/language/completion/packages/sqlcompletion.py [0:0]
def suggest_based_on_last_token(token, stmt):
if isinstance(token, string_types):
token_v = token.lower()
elif isinstance(token, Comparison):
# If 'token' is a Comparison type such as
# 'select * FROM abc a JOIN def d ON a.id = d.'. Then calling
# token.value on the comparison type will only return the lhs of the
# comparison. In this case a.id. So we need to do token.tokens to get
# both sides of the comparison and pick the last token out of that
# list.
token_v = token.tokens[-1].value.lower()
elif isinstance(token, Where):
# sqlparse groups all tokens from the where clause into a single token
# list. This means that token.value may be something like
# 'where foo > 5 and '. We need to look "inside" token.tokens to handle
# suggestions in complicated where clauses correctly
prev_keyword = stmt.reduce_to_prev_keyword()
return suggest_based_on_last_token(prev_keyword, stmt)
elif isinstance(token, Identifier):
# If the previous token is an identifier, we can suggest datatypes if
# we're in a parenthesized column/field list, e.g.:
# CREATE TABLE foo (Identifier <CURSOR>
# CREATE FUNCTION foo (Identifier <CURSOR>
# If we're not in a parenthesized list, the most likely scenario is the
# user is about to specify an alias, e.g.:
# SELECT Identifier <CURSOR>
# SELECT foo FROM Identifier <CURSOR>
prev_keyword, _ = find_prev_keyword(stmt.text_before_cursor)
if prev_keyword and prev_keyword.value == '(':
# Suggest datatypes
return suggest_based_on_last_token('type', stmt)
else:
return tuple([Keyword()])
else:
token_v = token.value.lower()
if not token:
# {{ PGToolsService EDIT }}
return tuple([Keyword()])
elif token_v.endswith('('):
p = sqlparse.parse(stmt.text_before_cursor)[0]
if p.tokens and isinstance(p.tokens[-1], Where):
# Four possibilities:
# 1 - Parenthesized clause like "WHERE foo AND ("
# Suggest columns/functions
# 2 - Function call like "WHERE foo("
# Suggest columns/functions
# 3 - Subquery expression like "WHERE EXISTS ("
# Suggest keywords, in order to do a subquery
# 4 - Subquery OR array comparison like "WHERE foo = ANY("
# Suggest columns/functions AND keywords. (If we wanted to be
# really fancy, we could suggest only array-typed columns)
column_suggestions = suggest_based_on_last_token('where', stmt)
# Check for a subquery expression (cases 3 & 4)
where = p.tokens[-1]
prev_tok = where.token_prev(len(where.tokens) - 1)[1]
if isinstance(prev_tok, Comparison):
# e.g. "SELECT foo FROM bar WHERE foo = ANY("
prev_tok = prev_tok.tokens[-1]
prev_tok = prev_tok.value.lower()
if prev_tok == 'exists':
return tuple([Keyword()])
else:
return column_suggestions
# Get the token before the parens
prev_tok = p.token_prev(len(p.tokens) - 1)[1]
if (prev_tok and prev_tok.value and prev_tok.value.lower().split(' ')[-1] == 'using'):
# tbl1 INNER JOIN tbl2 USING (col1, col2)
tables = stmt.get_tables('before')
# suggest columns that are present in more than one table
return (Column(table_refs=tables,
require_last_table=True,
local_tables=stmt.local_tables),)
elif p.token_first().value.lower() == 'select':
# If the lparen is preceeded by a space chances are we're about to
# do a sub-select.
if last_word(stmt.text_before_cursor,
'all_punctuations').startswith('('):
return tuple([Keyword()])
prev_prev_tok = prev_tok and p.token_prev(p.token_index(prev_tok))[1]
if prev_prev_tok and prev_prev_tok.normalized == 'INTO':
return (
Column(table_refs=stmt.get_tables('insert'), context='insert'),
)
# We're probably in a function argument list
return (Column(table_refs=extract_tables(stmt.full_text),
local_tables=stmt.local_tables, qualifiable=True),)
elif token_v == 'set':
return (Column(table_refs=stmt.get_tables(),
local_tables=stmt.local_tables),)
elif token_v in ('select', 'where', 'having', 'by', 'distinct'):
# Check for a table alias or schema qualification
parent = (stmt.identifier and stmt.identifier.get_parent_name()) or []
tables = stmt.get_tables()
if parent:
tables = tuple(t for t in tables if identifies(parent, t))
return (Column(table_refs=tables, local_tables=stmt.local_tables),
Table(schema=parent),
View(schema=parent),
Function(schema=parent),)
else:
return (Column(table_refs=tables, local_tables=stmt.local_tables,
qualifiable=True),
Function(schema=None),
Keyword(token_v.upper()),)
elif token_v == 'as':
# Don't suggest anything for aliases
return ()
elif (token_v.endswith('join') and token.is_keyword) or (token_v in ('copy', 'from', 'update', 'into', 'describe', 'truncate')):
schema = stmt.get_identifier_schema()
tables = extract_tables(stmt.text_before_cursor)
is_join = token_v.endswith('join') and token.is_keyword
# Suggest tables from either the currently-selected schema or the
# public schema if no schema has been specified
suggest = []
if not schema:
# Suggest schemas
suggest.insert(0, Schema())
if token_v == 'from' or is_join:
suggest.append(FromClauseItem(schema=schema,
table_refs=tables,
local_tables=stmt.local_tables))
elif token_v == 'truncate':
suggest.append(Table(schema))
else:
suggest.extend((Table(schema), View(schema)))
if is_join and _allow_join(stmt.parsed):
tables = stmt.get_tables('before')
suggest.append(Join(table_refs=tables, schema=schema))
return tuple(suggest)
elif token_v == 'function':
schema = stmt.get_identifier_schema()
# stmt.get_previous_token will fail for e.g. `SELECT 1 FROM functions WHERE function:`
try:
prev = stmt.get_previous_token(token).value.lower()
if prev in ('drop', 'alter', 'create', 'create or replace'):
return (Function(schema=schema, usage='signature'),)
except ValueError:
pass
return tuple()
elif token_v in ('table', 'view'):
# E.g. 'ALTER TABLE <tablname>'
rel_type = {'table': Table, 'view': View, 'function': Function}[token_v]
schema = stmt.get_identifier_schema()
if schema:
return (rel_type(schema=schema),)
else:
return (Schema(), rel_type(schema=schema))
elif token_v == 'column':
# E.g. 'ALTER TABLE foo ALTER COLUMN bar
return (Column(table_refs=stmt.get_tables()),)
elif token_v == 'on':
tables = stmt.get_tables('before')
parent = (stmt.identifier and stmt.identifier.get_parent_name()) or None
if parent:
# "ON parent.<suggestion>"
# parent can be either a schema name or table alias
filteredtables = tuple(t for t in tables if identifies(parent, t))
sugs = [Column(table_refs=filteredtables,
local_tables=stmt.local_tables),
Table(schema=parent),
View(schema=parent),
Function(schema=parent)]
if filteredtables and _allow_join_condition(stmt.parsed):
sugs.append(JoinCondition(table_refs=tables,
parent=filteredtables[-1]))
return tuple(sugs)
else:
# ON <suggestion>
# Use table alias if there is one, otherwise the table name
aliases = tuple(t.ref for t in tables)
if _allow_join_condition(stmt.parsed):
return (Alias(aliases=aliases), JoinCondition(
table_refs=tables, parent=None))
else:
return (Alias(aliases=aliases),)
elif token_v in ('c', 'use', 'database', 'template'):
# "\c <db", "use <db>", "DROP DATABASE <db>",
# "CREATE DATABASE <newdb> WITH TEMPLATE <db>"
return (Database(),)
elif token_v == 'schema':
# DROP SCHEMA schema_name, SET SCHEMA schema name
prev_keyword = stmt.reduce_to_prev_keyword(n_skip=2)
quoted = prev_keyword and prev_keyword.value.lower() == 'set'
return (Schema(quoted),)
elif token_v.endswith(',') or token_v in ('=', 'and', 'or'):
prev_keyword = stmt.reduce_to_prev_keyword()
if prev_keyword:
return suggest_based_on_last_token(prev_keyword, stmt)
else:
return ()
elif token_v in ('type', '::'):
# ALTER TABLE foo SET DATA TYPE bar
# SELECT foo::bar
# Note that tables are a form of composite type in postgresql, so
# they're suggested here as well
schema = stmt.get_identifier_schema()
suggestions = [Datatype(schema=schema),
Table(schema=schema)]
if not schema:
suggestions.append(Schema())
return tuple(suggestions)
elif token_v in {'alter', 'create', 'drop'}:
return tuple([Keyword(token_v.upper())])
elif token.is_keyword:
# token is a keyword we haven't implemented any special handling for
# go backwards in the query until we find one we do recognize
prev_keyword = stmt.reduce_to_prev_keyword(n_skip=1)
if prev_keyword:
return suggest_based_on_last_token(prev_keyword, stmt)
else:
return tuple([Keyword(token_v.upper())])
else:
return tuple([Keyword()])