ebcli/core/io.py (244 lines of code) (raw):
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import re
import warnings
import getpass
import sys
import logging
import colorama
import pydoc
from botocore.compat import six
from six import print_
from six.moves import input
from ebcli.core import ebglobals
from ebcli.objects.exceptions import ValidationError
from ebcli.resources.strings import prompts, strings
LOG = logging.getLogger(__name__)
def term_is_colorable():
return sys.stdout.isatty()
def bold(string):
s = _convert_to_string(string)
if term_is_colorable():
colorama.init()
return colorama.Style.BRIGHT + s + colorama.Style.NORMAL
else:
return s
def _remap_color(color):
if color.upper() == 'ORANGE':
return 'YELLOW'
if color.upper() in {'GREY', 'GRAY'}:
return 'WHITE'
return color
def color(color, string):
s = _convert_to_string(string)
if term_is_colorable():
colorama.init()
color = _remap_color(color)
color_code = getattr(colorama.Fore, color.upper())
return color_code + s + colorama.Fore.RESET
else:
return s
def on_color(color, string):
s = _convert_to_string(string)
if term_is_colorable():
colorama.init()
color = _remap_color(color)
color_code = getattr(colorama.Back, color.upper())
return color_code + s + colorama.Back.RESET
else:
return s
def echo_and_justify(justify, *args):
s = ''.join(s.ljust(justify) for s in _convert_to_strings(args))
print_(s.rstrip())
def echo(*args, **kwargs):
if 'sep' not in kwargs:
kwargs['sep'] = ' '
print_(*_convert_to_strings(args), **kwargs)
def _convert_to_strings(list_of_things):
for data in list_of_things:
yield _convert_to_string(data)
def _convert_to_string(data):
scalar_types = six.string_types + six.integer_types
if isinstance(data, six.binary_type):
if sys.version_info[0] >= 3:
return data.decode('utf8')
else:
return data
elif isinstance(data, six.text_type):
if sys.version_info[0] >= 3:
return data
else:
try:
return data.encode('utf8')
except:
return data.encode('utf8', 'replace')
elif isinstance(data, scalar_types) or hasattr(data, '__str__'):
return str(data)
else:
LOG.error('echo called with an unsupported data type')
LOG.debug('data class = ' + data.__class__.__name__)
def log_alert(message):
echo('Alert:', message)
def log_info(message):
try:
ebglobals.app.log.info(message)
except AttributeError:
echo('INFO: {}'.format(message))
def log_warning(message):
try:
ebglobals.app.log.warn(message)
except AttributeError:
echo(bold(color('red', 'WARN: {}'.format(message))))
def log_error(message):
try:
if ebglobals.app.pargs.debug:
ebglobals.app.log.error(message)
else:
echo(bold(color('red', 'ERROR: {}'.format(message))))
except AttributeError:
echo(bold(color('red', 'ERROR: {}'.format(message))))
def get_input(output, default=None):
try:
import readline
except ImportError:
pass
output = next(_convert_to_strings([output]))
result = _get_input(output)
return result or default
def _get_input(output):
return input(output + ': ').strip()
def echo_with_pager(output):
pydoc.pager(output)
def prompt(output, default=None):
return get_input('(' + output + ')', default)
def prompt_for_unique_name(default, unique_list):
with warnings.catch_warnings():
warnings.simplefilter('ignore')
assert default not in unique_list, 'Default name is not unique'
while True:
result = prompt('default is "' + default + '"', default=default)
if result in unique_list:
echo('Sorry that name already exists, try another.')
else:
break
return result
def prompt_for_environment_name(default_name='myEnv',
prompt_text='Enter Environment Name'):
""" Validate env_name: Spec says:
Constraint: Must be from 4 to 40 characters in length.
The name can contain only letters, numbers, and hyphens.
It cannot start or end with a hyphen.
"""
constraint_pattern = '^[a-z0-9][a-z0-9-]{2,38}[a-z0-9]$'
if not re.match(constraint_pattern, default_name):
if not re.match('^[a-zA-Z0-9].*', default_name):
default_name = 'eb-' + default_name
default_name = default_name.replace('_', '-')
default_name = re.sub('[^a-z0-9A-Z-]', '', default_name)
if len(default_name) > 40:
default_name = default_name[:39]
if not re.match('.*[a-zA-Z0-9]$', default_name):
default_name += '0'
while True:
echo(prompt_text)
env_name = prompt('default is ' + default_name)
if not env_name:
return default_name
if re.match(constraint_pattern, env_name.lower()):
break
else:
echo('Environment name must be 4 to 40 characters in length. It '
'can only contain letters, numbers, and hyphens. It can not '
'start or end with a hyphen')
return env_name
def get_pass(output):
while True:
result = getpass.getpass(output + ': ')
if result == getpass.getpass('Retype password to confirm: '):
return result
else:
echo()
log_error('Passwords do not match')
def validate_action(output, expected_input):
result = get_input(output)
if result != expected_input:
raise ValidationError(prompts['terminate.nomatch'])
def prompt_for_cname(default=None):
while True:
echo('Enter DNS CNAME prefix')
if default:
cname = prompt('default is ' + default)
else:
cname = prompt('defaults to an auto-generated value')
if not cname:
return default
if re.match('^[a-z0-9][a-z0-9-]{2,61}[a-z0-9]$', cname.lower()):
break
else:
echo('CNAME must be 4 to 63 characters in length. It can'
' only contain letters, numbers, and hyphens. It can not '
'start or end with a hyphen')
return cname
def update_upload_progress(progress):
"""
Displays or updates a console progress bar
:param progress: Accepts a float between 0 and 1.
Any int will be converted to a float.
A value under 0 represents a 'halt'.
A value at 1 or bigger represents 100%
"""
barLength = 50 # Modify this to change the length of the progress bar
status = ""
if isinstance(progress, int):
progress = float(progress)
if not isinstance(progress, float):
progress = 0
status = "error: progress var must be float\r\n"
if progress < 0:
progress = 0
status = "Halt...\r\n"
if progress >= 1:
progress = 1
status = "Done...\r\n"
block = int(round(barLength*progress))
progress = int(round(progress * 100))
text = "\rUploading: [{0}] {1}% {2}".format(
"#"*block + "-"*(barLength-block), progress, status)
sys.stdout.write(text)
sys.stdout.flush()
def get_boolean_response(text=None, default=True):
if not isinstance(default, bool):
raise TypeError('get_boolean_response() named argument \'default\' must '
'be a bool, not {}'.format(type(default).__name__))
prompt_message = ''
default_string = 'y' if default else 'n'
prompt_value_options = '(Y/n)' if default else '(y/N)'
if text:
prompt_message = text + ' '
prompt_message += prompt_value_options
response = get_input(prompt_message, default=default_string).lower()
while response not in ('y', 'n', 'yes', 'no'):
echo(
strings['prompt.invalid'],
strings['prompt.yes-or-no']
)
response = get_input(prompt_value_options, default=default_string).lower()
if response in ('y', 'yes'):
return True
else:
return False
def get_event_streamer():
if sys.stdout.isatty():
return EventStreamer()
else:
return PipeStreamer()
class EventStreamer(object):
def __init__(self):
self.prompt = strings['events.streamprompt']
self.unsafe_prompt = strings['events.unsafestreamprompt']
self.eventcount = 0
def stream_event(self, message, safe_to_quit=True):
"""
Streams an event so a prompt is displayed at the bottom of the stream
:param safe_to_quit: this determines which static event prompt to show to the user
:param message: message to be streamed
"""
length = len(self.prompt)
echo('\r', message.ljust(length), sep='')
if safe_to_quit:
echo(self.prompt, end='')
else:
echo(self.unsafe_prompt, end='')
sys.stdout.flush()
self.eventcount += 1
def end_stream(self):
"""
Removes the self.prompt from the screen
"""
if self.eventcount < 1:
return
length = len(self.prompt) + 3 # Cover up "^C" character as well
print_('\r'.ljust(length))
class PipeStreamer(EventStreamer):
""" Really just a wrapper for EventStreamer
We dont want to actually do any "streaming" if
a pipe is being used, so we will just use standard printing
"""
def stream_event(self, message, safe_to_quit=True):
echo(message)
def end_stream(self):
return