in gslib/ui_controller.py [0:0]
def Call(self, status_message, stream, cur_time=None):
"""Coordinates UI manager and calls appropriate function to handle message.
Args:
status_message: Message to be processed. Could be None if UIThread cannot
retrieve message from status_queue.
stream: Stream to print messages. Usually sys.stderr, but customizable
for testing.
cur_time: Message time. Used to determine if it is time to refresh
output, or calculate throughput.
"""
if not isinstance(status_message, StatusMessage):
if status_message == _ZERO_TASKS_TO_DO_ARGUMENT and not self.manager:
# Create a manager to handle early estimation messages before returning.
self.manager = (DataManager(
update_message_period=self.update_message_period,
update_spinner_period=self.update_spinner_period,
sliding_throughput_period=self.sliding_throughput_period,
first_throughput_latency=self.first_throughput_latency,
quiet_mode=self.quiet_mode,
custom_time=self.custom_time,
verbose=self.verbose,
console_width=self.console_width))
for estimation_message in self.early_estimation_messages:
self._HandleMessage(estimation_message,
stream,
cur_time=estimation_message.time)
return
if self.dump_status_message_fp:
# TODO: Add Unicode support to string methods on message classes.
# Currently, dump will fail with a UnicodeEncodeErorr if the message
# class contains a Unicode attribute.
self.dump_status_message_fp.write(str(status_message))
self.dump_status_message_fp.write('\n')
if not cur_time:
cur_time = status_message.time
if not self.manager:
if (isinstance(status_message, SeekAheadMessage) or
isinstance(status_message, ProducerThreadMessage)):
self.early_estimation_messages.append(status_message)
return
elif isinstance(status_message, MetadataMessage):
self.manager = (MetadataManager(
update_message_period=self.update_message_period,
update_spinner_period=self.update_spinner_period,
sliding_throughput_period=self.sliding_throughput_period,
first_throughput_latency=self.first_throughput_latency,
quiet_mode=self.quiet_mode,
custom_time=self.custom_time,
verbose=self.verbose,
console_width=self.console_width))
for estimation_message in self.early_estimation_messages:
self._HandleMessage(estimation_message, stream, cur_time)
else:
self.manager = (DataManager(
update_message_period=self.update_message_period,
update_spinner_period=self.update_spinner_period,
sliding_throughput_period=self.sliding_throughput_period,
first_throughput_latency=self.first_throughput_latency,
quiet_mode=self.quiet_mode,
custom_time=self.custom_time,
verbose=self.verbose,
console_width=self.console_width))
for estimation_message in self.early_estimation_messages:
self._HandleMessage(estimation_message, stream, cur_time)
if not self.manager.CanHandleMessage(status_message):
if (isinstance(status_message, FileMessage) or
isinstance(status_message, ProgressMessage)):
# We have to create a DataManager to handle this data message. This is
# to avoid a possible race condition where MetadataMessages are sent
# before data messages. As such, this means that the DataManager has
# priority, and whenever a data message is received, we ignore the
# MetadataManager if one exists, and start a DataManager from scratch.
# This can be done because we do not need any MetadataMessages to
# properly handle a data operation. It could be useful to send the
# early estimation messages, if those are available.
self.manager = (DataManager(
update_message_period=self.update_message_period,
update_spinner_period=self.update_spinner_period,
sliding_throughput_period=self.sliding_throughput_period,
first_throughput_latency=self.first_throughput_latency,
custom_time=self.custom_time,
verbose=self.verbose,
console_width=self.console_width))
for estimation_message in self.early_estimation_messages:
self._HandleMessage(estimation_message, stream, cur_time)
else:
# No need to handle this message.
return
self._HandleMessage(status_message, stream, cur_time)