in libcloud/storage/drivers/atmos.py [0:0]
def upload_object_via_stream(self, iterator, container, object_name, extra=None, headers=None):
if isinstance(iterator, file):
iterator = iter(iterator)
extra_headers = headers or {}
data_hash = hashlib.md5() # nosec
generator = read_in_chunks(iterator, CHUNK_SIZE, True)
bytes_transferred = 0
try:
chunk = next(generator)
except StopIteration:
chunk = ""
path = self._namespace_path(container.name + "/" + object_name)
method = "PUT"
if extra is not None:
content_type = extra.get("content_type", None)
else:
content_type = None
content_type = self._determine_content_type(content_type, object_name)
try:
self.connection.request(path + "?metadata/system")
except AtmosError as e:
if e.code != 1003:
raise
method = "POST"
while True:
end = bytes_transferred + len(chunk) - 1
data_hash.update(b(chunk))
headers = dict(extra_headers)
headers.update(
{
"x-emc-meta": "md5=" + data_hash.hexdigest(), # nosec
"Content-Type": content_type,
}
)
if len(chunk) > 0 and bytes_transferred > 0:
headers["Range"] = "Bytes=%d-%d" % (bytes_transferred, end)
method = "PUT"
result = self.connection.request(path, method=method, data=chunk, headers=headers)
bytes_transferred += len(chunk)
try:
chunk = next(generator)
except StopIteration:
break
if len(chunk) == 0:
break
data_hash = data_hash.hexdigest() # nosec
if extra is None:
meta_data = {}
else:
meta_data = extra.get("meta_data", {})
meta_data["md5"] = data_hash
user_meta = ", ".join([k + "=" + str(v) for k, v in list(meta_data.items())])
self.connection.request(
path + "?metadata/user", method="POST", headers={"x-emc-meta": user_meta}
)
result = self.connection.request(path + "?metadata/system")
meta = self._emc_meta(result)
extra = {
"object_id": meta["objectid"],
"meta_data": meta_data,
}
return Object(object_name, bytes_transferred, data_hash, extra, meta_data, container, self)