in gslib/gcs_json_media.py [0:0]
def GetConnectionClass(self):
"""Returns a connection class that overrides send."""
outer_bytes_uploaded_container = self.bytes_uploaded_container
outer_buffer_size = self.buffer_size
outer_total_size = self.total_size
outer_progress_callback = self.progress_callback
outer_logger = self.logger
outer_debug = self.debug
class UploadCallbackConnection(httplib2.HTTPSConnectionWithTimeout):
"""Connection class override for uploads."""
bytes_uploaded_container = outer_bytes_uploaded_container
# After we instantiate this class, apitools will check with the server
# to find out how many bytes remain for a resumable upload. This allows
# us to update our progress once based on that number.
processed_initial_bytes = False
GCS_JSON_BUFFER_SIZE = outer_buffer_size
callback_processor = None
size = outer_total_size
header_encoding = ''
header_length = None
header_range = None
size_modifier = 1.0
def __init__(self, *args, **kwargs):
kwargs['timeout'] = SSL_TIMEOUT_SEC
httplib2.HTTPSConnectionWithTimeout.__init__(self, *args, **kwargs)
# Override httplib.HTTPConnection._send_output for debug logging.
# Because the distinction between headers and message body occurs
# only in this httplib function, we can only differentiate them here.
def _send_output(self, message_body=None, encode_chunked=False):
r"""Send the currently buffered request and clear the buffer.
Appends an extra \r\n to the buffer.
Args:
message_body: if specified, this is appended to the request.
"""
# TODO: Presently, apitools will set http2lib2.debuglevel to 0
# (no prints) or 4 (dump upload payload, httplib prints to stdout).
# Refactor to allow our media-handling functions to handle
# debuglevel == 4 and print messages to stderr.
self._buffer.extend((b'', b''))
if six.PY2:
items = self._buffer
else:
items = []
for item in self._buffer:
if isinstance(item, bytes):
items.append(item)
else:
items.append(item.encode(UTF8))
msg = b'\r\n'.join(items)
num_metadata_bytes = len(msg)
if outer_debug == DEBUGLEVEL_DUMP_REQUESTS and outer_logger:
outer_logger.debug('send: %s' % msg)
del self._buffer[:]
# If msg and message_body are sent in a single send() call,
# it will avoid performance problems caused by the interaction
# between delayed ack and the Nagle algorithm.
if isinstance(message_body, str):
msg += message_body
message_body = None
self.send(msg, num_metadata_bytes=num_metadata_bytes)
if message_body is not None:
# message_body was not a string (i.e. it is a file) and
# we must run the risk of Nagle
self.send(message_body)
def putheader(self, header, *values):
"""Overrides HTTPConnection.putheader.
Send a request header line to the server. For example:
h.putheader('Accept', 'text/html').
This override records the content encoding, length, and range of the
payload. For uploads where the content-range difference does not match
the content-length, progress printing will under-report progress. These
headers are used to calculate a multiplier to correct the progress.
For example: the content-length for gzip transport encoded data
represents the compressed size of the data while the content-range
difference represents the uncompressed size. Dividing the
content-range difference by the content-length gives the ratio to
multiply the progress by to correctly report the relative progress.
Args:
header: The header.
*values: A set of values for the header.
"""
if header == 'content-encoding':
value = ''.join([str(v) for v in values])
self.header_encoding = value
if outer_debug == DEBUGLEVEL_DUMP_REQUESTS and outer_logger:
outer_logger.debug(
'send: Using gzip transport encoding for the request.')
elif header == 'content-length':
try:
value = int(''.join([str(v) for v in values]))
self.header_length = value
except ValueError:
pass
elif header == 'content-range':
try:
# There are 3 valid header formats:
# '*/%d', '%d-%d/*', and '%d-%d/%d'
value = ''.join([str(v) for v in values])
ranges = DECIMAL_REGEX().findall(value)
# If there are 2 or more range values, they will always
# correspond to the start and end ranges in the header.
if len(ranges) > 1:
# Subtract the end position from the start position.
self.header_range = (int(ranges[1]) - int(ranges[0])) + 1
except ValueError:
pass
# If the content header is gzip, and a range and length are set,
# update the modifier.
if (self.header_encoding == 'gzip' and self.header_length and
self.header_range):
# Update the modifier
self.size_modifier = self.header_range / float(self.header_length)
# Reset the headers
self.header_encoding = ''
self.header_length = None
self.header_range = None
# Log debug information to catch in tests.
if outer_debug == DEBUGLEVEL_DUMP_REQUESTS and outer_logger:
outer_logger.debug('send: Setting progress modifier to %s.' %
(self.size_modifier))
# Propagate header values.
http_client.HTTPSConnection.putheader(self, header, *values)
def send(self, data, num_metadata_bytes=0):
"""Overrides HTTPConnection.send.
Args:
data: string or file-like object (implements read()) of data to send.
num_metadata_bytes: number of bytes that consist of metadata
(headers, etc.) not representing the data being uploaded.
"""
if not self.processed_initial_bytes:
self.processed_initial_bytes = True
if outer_progress_callback:
self.callback_processor = ProgressCallbackWithTimeout(
outer_total_size, outer_progress_callback)
self.callback_processor.Progress(
self.bytes_uploaded_container.bytes_transferred)
# httplib.HTTPConnection.send accepts either a string or a file-like
# object (anything that implements read()).
if isinstance(data, six.text_type):
full_buffer = cStringIO(data)
elif isinstance(data, six.binary_type):
full_buffer = six.BytesIO(data)
else:
full_buffer = data
partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE)
while partial_buffer:
if six.PY2:
httplib2.HTTPSConnectionWithTimeout.send(self, partial_buffer)
else:
if isinstance(partial_buffer, bytes):
httplib2.HTTPSConnectionWithTimeout.send(self, partial_buffer)
else:
httplib2.HTTPSConnectionWithTimeout.send(
self, partial_buffer.encode(UTF8))
sent_data_bytes = len(partial_buffer)
if num_metadata_bytes:
if num_metadata_bytes <= sent_data_bytes:
sent_data_bytes -= num_metadata_bytes
num_metadata_bytes = 0
else:
num_metadata_bytes -= sent_data_bytes
sent_data_bytes = 0
if self.callback_processor:
# Modify the sent data bytes by the size modifier. These are
# stored as floats, so the result should be floored.
sent_data_bytes = int(sent_data_bytes * self.size_modifier)
# TODO: We can't differentiate the multipart upload
# metadata in the request body from the actual upload bytes, so we
# will actually report slightly more bytes than desired to the
# callback handler. Get the number of multipart upload metadata
# bytes from apitools and subtract from sent_data_bytes.
self.callback_processor.Progress(sent_data_bytes)
partial_buffer = full_buffer.read(self.GCS_JSON_BUFFER_SIZE)
return UploadCallbackConnection