in shell/impala_shell.py [0:0]
def impala_shell_main():
"""
There are two types of options: shell options and query_options. Both can be set on the
command line, which override default options. Specifically, if there exists a global
config file (default path: /etc/impalarc) then options are loaded from that file. If
there exists a user config file (~/.impalarc), then options are loaded in from that
file and override any options already loaded from the global impalarc. The default shell
options come from impala_shell_config_defaults.py. Query options have no defaults within
the impala-shell, but they do have defaults on the server. Query options can be also
changed in impala-shell with the 'set' command.
"""
# pass defaults into option parser
global options, parser
parser = get_option_parser(impala_shell_defaults)
options, args = parser.parse_args()
# by default, use the impalarc in the user's home directory
# and superimpose it on the global impalarc config
global_config = os.path.expanduser(
os.environ.get(ImpalaShell.GLOBAL_CONFIG_FILE,
impala_shell_defaults['global_config_default_path']))
if os.path.isfile(global_config):
# Always output the source of the global config if verbose
if options.verbose:
print(
"Loading in options from global config file: %s \n" % global_config,
file=sys.stderr)
elif global_config != impala_shell_defaults['global_config_default_path']:
print('%s not found.\n' % global_config, file=sys.stderr)
raise FatalShellException()
# Override the default user config by a custom config if necessary
user_config = impala_shell_defaults.get("config_file")
input_config = os.path.expanduser(options.config_file)
# verify input_config, if found
if input_config != user_config:
if os.path.isfile(input_config):
if options.verbose:
print("Loading in options from config file: %s \n" % input_config,
file=sys.stderr)
# command line overrides loading ~/.impalarc
user_config = input_config
else:
print('%s not found.\n' % input_config, file=sys.stderr)
raise FatalShellException()
configs_to_load = [global_config, user_config]
# load shell and query options from the list of config files
# in ascending order of precedence
try:
loaded_shell_options = {}
query_options = {}
for config_file in configs_to_load:
s_options, q_options = get_config_from_file(config_file,
parser.option_list)
loaded_shell_options.update(s_options)
query_options.update(q_options)
impala_shell_defaults.update(loaded_shell_options)
except Exception as e:
print(e, file=sys.stderr)
raise FatalShellException()
parser = get_option_parser(impala_shell_defaults)
options, args = parser.parse_args()
# Arguments that could not be parsed are stored in args. Print an error and exit.
if len(args) > 0:
print('Error, could not parse arguments "%s"' % (' ').join(args), file=sys.stderr)
parser.print_help()
raise FatalShellException()
if options.version:
print(VERSION_STRING)
return
if options.write_delimited:
if isinstance(options.output_delimiter, str):
delim_sequence = bytearray(options.output_delimiter, 'utf-8')
else:
delim_sequence = options.output_delimiter
delim = delim_sequence.decode('unicode_escape')
if len(delim) != 1:
print("Illegal delimiter %s, the delimiter "
"must be a 1-character string." % delim, file=sys.stderr)
raise FatalShellException()
auth_method_count = 0
if options.use_kerberos:
auth_method_count += 1
if options.use_ldap:
auth_method_count += 1
if options.use_jwt:
auth_method_count += 1
if options.use_oauth:
auth_method_count += 1
if auth_method_count > 1:
print("Please specify at most one authentication mechanism (-k, -l, or -j)",
file=sys.stderr)
raise FatalShellException()
if not options.ssl and not options.creds_ok_in_clear and options.use_ldap:
print(("LDAP credentials may not be sent over insecure "
"connections. Enable SSL or set --auth_creds_ok_in_clear"),
file=sys.stderr)
raise FatalShellException()
if not options.use_ldap and options.ldap_password_cmd:
print(("Option --ldap_password_cmd requires using LDAP authentication "
"mechanism (-l)"), file=sys.stderr)
raise FatalShellException()
if options.use_jwt and options.protocol.lower() != 'hs2-http':
print("Invalid protocol '{0}'. JWT authentication requires using the 'hs2-http' "
"protocol".format(options.protocol), file=sys.stderr)
raise FatalShellException()
if options.use_jwt and options.strict_hs2_protocol:
print("JWT authentication is not supported when using strict hs2.", file=sys.stderr)
raise FatalShellException()
if options.use_jwt and not options.ssl and not options.creds_ok_in_clear:
print("JWTs may not be sent over insecure connections. Enable SSL or "
"set --auth_creds_ok_in_clear", file=sys.stderr)
raise FatalShellException()
if not options.use_jwt and options.jwt_cmd:
print("Option --jwt_cmd requires using JWT authentication mechanism (-j)",
file=sys.stderr)
raise FatalShellException()
if options.use_oauth and options.protocol.lower() != 'hs2-http':
print("Invalid protocol '{0}'. OAUTH authentication requires using the 'hs2-http' "
"protocol".format(options.protocol), file=sys.stderr)
raise FatalShellException()
if options.use_oauth and options.strict_hs2_protocol:
print("OAUTH authentication is not supported when using strict hs2.", file=sys.stderr)
raise FatalShellException()
if options.use_oauth and not options.ssl and not options.creds_ok_in_clear:
print("OAUTHs may not be sent over insecure connections. Enable SSL or "
"set --auth_creds_ok_in_clear", file=sys.stderr)
raise FatalShellException()
if not options.use_oauth and options.oauth_cmd:
print("Option --oauth_cmd requires using OAUTH authentication mechanism (-a)",
file=sys.stderr)
raise FatalShellException()
if options.hs2_fp_format:
try:
_validate_hs2_fp_format_specification(options.hs2_fp_format)
except FatalShellException as e:
print("Invalid floating point format specification: %s" %
options.hs2_fp_format, file=sys.stderr)
raise e
except UnicodeDecodeError as e:
print("Unicode character in format specification is not supported",
file=sys.stderr)
raise FatalShellException(e)
start_msg = "Starting Impala Shell"
py_version_msg = "using Python {0}.{1}.{2}".format(
sys.version_info.major, sys.version_info.minor, sys.version_info.micro)
if options.use_kerberos:
if options.verbose:
kerb_msg = "with Kerberos authentication"
print("{0} {1} {2}".format(start_msg, kerb_msg, py_version_msg), file=sys.stderr)
print("Using service name '%s'" % options.kerberos_service_name, file=sys.stderr)
# Check if the user has a ticket in the credentials cache
try:
if call(['klist', '-s']) != 0:
print(("-k requires a valid kerberos ticket but no valid kerberos "
"ticket found."), file=sys.stderr)
raise FatalShellException()
except OSError as e:
print('klist not found on the system, install kerberos clients', file=sys.stderr)
raise FatalShellException()
elif options.use_ldap:
if options.verbose:
ldap_msg = "with LDAP-based authentication"
print("{0} {1} {2}".format(start_msg, ldap_msg, py_version_msg), file=sys.stderr)
elif options.use_jwt:
if options.verbose:
ldap_msg = "with JWT-based authentication"
print("{0} {1} {2}".format(start_msg, ldap_msg, py_version_msg), file=sys.stderr)
elif options.use_oauth:
if options.verbose:
ldap_msg = "with OAUTH-based authentication"
print("{0} {1} {2}".format(start_msg, ldap_msg, py_version_msg), file=sys.stderr)
else:
if options.verbose:
no_auth_msg = "with no authentication"
print("{0} {1} {2}".format(start_msg, no_auth_msg, py_version_msg), file=sys.stderr)
options.ldap_password = None
if options.use_ldap and options.ldap_password_cmd:
options.ldap_password = read_password_cmd(options.ldap_password_cmd, "LDAP password")
options.jwt = None
if options.use_jwt and options.jwt_cmd:
options.jwt = read_password_cmd(options.jwt_cmd, "JWT", True)
options.oauth = None
if options.use_oauth and options.oauth_cmd:
options.oauth = read_password_cmd(options.oauth_cmd, "OAUTH", True)
if options.ssl:
if options.ca_cert is None:
if options.verbose:
print("SSL is enabled. Impala server certificates will NOT be verified "
"(set --ca_cert to change)", file=sys.stderr)
else:
if options.verbose:
print("SSL is enabled", file=sys.stderr)
if options.verbose:
try:
import thrift.protocol.fastbinary
except Exception as e:
print("WARNING: Failed to load Thrift's fastbinary module. Thrift's "
"BinaryProtocol will not be accelerated, which can reduce performance. "
"Error was '{0}'".format(e), file=sys.stderr)
if options.output_file:
try:
# Make sure the given file can be opened for writing. This will also clear the file
# if successful.
open(options.output_file, 'wb')
except IOError as e:
print('Error opening output file for writing: %s' % e, file=sys.stderr)
raise FatalShellException()
if options.http_socket_timeout_s is not None:
if (options.http_socket_timeout_s != 'None'
and float(options.http_socket_timeout_s) < 0):
print("http_socket_timeout_s must be a nonnegative floating point number"
" expressing seconds, or None", file=sys.stderr)
raise FatalShellException()
if options.connect_max_tries < 1:
print("connect_max_tries must be greater than or equal to 1", file=sys.stderr)
raise FatalShellException()
options.variables = parse_variables(options.keyval)
# Override query_options from config file with those specified on the command line.
query_options.update(
[(k.upper(), v) for k, v in parse_variables(options.query_options).items()])
# Non-interactive mode
if options.query or options.query_file:
# Impala shell will disable live_progress if non-interactive mode is detected.
if options.live_progress:
if options.verbose:
print("Warning: live_progress only applies to interactive shell sessions, "
"and is being skipped for now.", file=sys.stderr)
options.live_progress = False
if options.live_summary:
print("Error: live_summary is available for interactive mode only.",
file=sys.stderr)
raise FatalShellException()
if execute_queries_non_interactive_mode(options, query_options):
return
else:
raise FatalShellException()
intro = get_intro(options)
with ImpalaShell(options, query_options) as shell:
while shell.is_alive:
try:
try:
shell.cmdloop(intro)
except KeyboardInterrupt:
print('^C', file=sys.stderr)
# A last measure against any exceptions thrown by an rpc
# not caught in the shell
except socket.error as e:
# if the socket was interrupted, reconnect the connection with the client
if e.errno == errno.EINTR:
print(shell.CANCELLATION_MESSAGE)
shell._reconnect_cancellation()
else:
print("%s %s: %s" %
(shell.SOCKET_ERROR_MESSAGE, e.errno, e), file=sys.stderr)
shell.imp_client.connected = False
shell.prompt = shell.DISCONNECTED_PROMPT
except DisconnectedException as e:
# the client has lost the connection
print(e, file=sys.stderr)
shell.imp_client.connected = False
shell.prompt = shell.DISCONNECTED_PROMPT
except QueryStateException as e:
# an exception occurred while executing the query
shell.imp_client.close_query(shell.last_query_handle)
print(e, file=sys.stderr)
except RPCException as e:
# could not complete the rpc successfully
print(e, file=sys.stderr)
except IOError as e:
# Interrupted system calls (e.g. because of cancellation) should be ignored.
if e.errno != errno.EINTR: raise
finally:
intro = ''