unittests/common.py (366 lines of code) (raw):
# -*- coding: utf-8 -*-
import random
import string
import unittest
import tempfile
import os
import io
import functools
import re
import xml
from xml.dom import minidom
import shutil
import oss2
DT_NONE = 0
DT_BYTES = 1
DT_FILE = 2
CHUNK_SIZE = 8192
BUCKET_NAME = 'ming-oss-share'
def random_string(n):
return ''.join(random.choice(string.ascii_lowercase) for i in range(n))
def random_bytes(n):
return oss2.to_bytes(random_string(n))
def bucket(crypto_provider=None):
if crypto_provider:
return oss2.CryptoBucket(oss2.Auth('fake-access-key-id', 'fake-access-key-secret'),
'http://oss-cn-hangzhou.aliyuncs.com', BUCKET_NAME,
crypto_provider=crypto_provider)
else:
return oss2.Bucket(oss2.Auth('fake-access-key-id', 'fake-access-key-secret'),
'http://oss-cn-hangzhou.aliyuncs.com', BUCKET_NAME)
def service():
return oss2.Service(oss2.Auth('fake-access-key-id', 'fake-access-key-secret'),
'http://oss-cn-hangzhou.aliyuncs.com')
class RequestInfo(object):
def __init__(self):
self.req = None
self.data = None
self.resp = None
self.size = None
MTIME_STRING = 'Fri, 11 Dec 2015 13:01:41 GMT'
MTIME = 1449838901
REQUEST_ID = '566AB62EB06147681C283D73'
ETAG = '7AE1A589ED6B161CAD94ACDB98206DA6'
RAW_ETAG = '"' + ETAG + '"'
def merge_headers(dst, src):
if not src:
return
for k, v in src.items():
dst[k] = v
def r4delete(in_status=204, in_headers=None):
headers = oss2.CaseInsensitiveDict({
'Server': 'AliyunOSS',
'Date': 'Fri, 11 Dec 2015 11:40:31 GMT',
'Content-Length': '0',
'Connection': 'keep-alive',
'x-oss-request-id': REQUEST_ID
})
merge_headers(headers, in_headers)
return MockResponse(in_status, headers, b'')
def r4head(length, in_status=200, in_headers=None):
headers = oss2.CaseInsensitiveDict({
'Server': 'AliyunOSS',
'Date': 'Fri, 11 Dec 2015 11:40:31 GMT',
'Content-Type': 'application/javascript',
'Content-Length': str(length),
'Connection': 'keep-alive',
'Vary': 'Accept-Encoding',
'x-oss-request-id': REQUEST_ID,
'Accept-Ranges': 'bytes',
'ETag': RAW_ETAG,
'Last-Modified': MTIME_STRING,
'x-oss-object-type': 'Normal'
})
merge_headers(headers, in_headers)
return MockResponse(in_status, headers, b'')
def r4get(body, in_status=200, in_headers=None):
resp = r4head(len(body), in_status=in_status, in_headers=in_headers)
resp.body = body
return resp
def r4put(in_status=200, in_headers=None):
headers = oss2.CaseInsensitiveDict({
'Server': 'AliyunOSS',
'Date': 'Fri, 11 Dec 2015 11:40:30 GMT',
'Content-Length': '0',
'Connection': 'keep-alive',
'x-oss-request-id': REQUEST_ID
})
merge_headers(headers, in_headers)
return MockResponse(in_status, headers, b'')
def r4copy():
body = '''
<?xml version="1.0" encoding="UTF-8"?>
<CopyObjectResult>
<ETag>"{0}"</ETag>
<LastModified>2015-12-12T00:36:29.000Z</LastModified>
</CopyObjectResult>
'''.format(RAW_ETAG)
headers = oss2.CaseInsensitiveDict({
'Server': 'AliyunOSS',
'Date': 'Fri, 11 Dec 2015 11:40:30 GMT',
'Content-Length': len(body),
'Connection': 'keep-alive',
'x-oss-request-id': REQUEST_ID,
'ETag': RAW_ETAG
})
return MockResponse(200, headers, body)
def do4body(req, timeout,
req_info=None,
data_type=DT_BYTES,
status=200,
body=None,
content_type=None):
if content_type:
headers = {'Content-Type': content_type}
else:
headers = None
resp = r4get(body, in_headers=headers, in_status=status)
if req_info:
req_info.req = req
req_info.size = get_length(req.data)
req_info.data = read_data(req.data, data_type)
req_info.resp = resp
return resp
class NonlocalObject(object):
def __init__(self, value):
self.var = value
def make_do4body(req_infos=None, body_list=None):
if req_infos is None:
req_infos = [None] * len(body_list)
i = NonlocalObject(0)
def do4body_func(req, timeout):
result = do4body(req, timeout,
req_info=req_infos[i.var],
body=body_list[i.var])
i.var += 1
return result
return do4body_func
def is_string_type(obj):
if oss2.compat.is_py2:
return isinstance(obj, (str, bytes, unicode))
else:
return isinstance(obj, (str, bytes))
def do4response(req, timeout, req_info=None, payload=None):
if req_info:
req_info.req = req
if req.data is None:
req_info.data = b''
req_info.size = 0
elif is_string_type(req.data):
req_info.data = oss2.to_bytes(req.data)
req_info.size = len(req_info.data)
else:
req_info.data = read_data(req.data, DT_FILE)
req_info.size = get_length(req.data)
return MockResponse2(payload)
def do4put(req, timeout, in_headers=None, req_info=None, data_type=DT_BYTES):
resp = r4put(in_headers=in_headers)
if req_info:
req_info.req = req
req_info.resp = resp
req_info.size = get_length(req.data)
req_info.data = read_data(req.data, data_type)
return resp
def do4delete(req, timeout, in_headers=None, req_info=None):
resp = r4delete(204, in_headers)
if req_info:
req_info.req = req
req_info.resp = resp
return resp
def do4put_object(req, timeout, req_info=None, data_type=DT_BYTES):
return do4put(req, timeout,
in_headers={'ETag': '"E5831D5EBC7AAF5D6C0D20259FE141D2"'},
req_info=req_info,
data_type=data_type)
def do4copy(req, timeout, req_info=None):
resp = r4copy()
if req_info:
req_info.req = req
req_info.resp = resp
req_info.size = get_length(req.data)
req_info.data = read_data(req.data, DT_BYTES)
return resp
def read_file(fileobj):
result = b''
while True:
content = fileobj.read(CHUNK_SIZE)
if content:
result += content
else:
return result
def read_data(data, data_type):
if data_type == DT_BYTES:
return data
elif data_type == DT_FILE:
return read_file(data)
else:
raise RuntimeError('wrong data type: {0}'.format(data_type))
def get_length(data):
try:
return len(data)
except TypeError:
return None
def calc_crc(data):
crc = oss2.utils.Crc64()
crc.update(data)
return crc.crc
class MockSocket(object):
def __init__(self, payload):
self._file = io.BytesIO(payload)
def makefile(self, *args, **kwargs):
return self._file
def mock_response(do_request, payload):
req_info = RequestInfo()
do_request.auto_spec = True
do_request.side_effect = functools.partial(do4response, req_info=req_info, payload=payload)
return req_info
class MockResponse(object):
def __init__(self, status, headers, body):
self.status = status
self.headers = oss2.CaseInsensitiveDict(headers)
self.body = oss2.to_bytes(body)
self.request_id = headers.get('x-oss-request-id', '')
self.offset = 0
def read(self, amt=None):
if self.offset >= len(self.body):
return ''
if amt is None:
end = len(self.body)
else:
end = min(len(self.body), self.offset + amt)
content = self.body[self.offset:end]
self.offset = end
return content
def __iter__(self):
return self
def __next__(self):
return self.next()
def next(self):
return self.read(8192)
def query_to_params(query):
params = {}
for kv_pair in query.split('&'):
kv = kv_pair.split('=', 1)
if len(kv) == 2:
params[kv[0]] = kv[1]
else:
params[kv[0]] = ''
return params
def head_fields_to_headers(head_fields):
headers = oss2.CaseInsensitiveDict()
for header_kv in head_fields:
kv = header_kv.split(':', 1)
if len(kv) == 2:
headers[kv[0].strip()] = kv[1].strip()
else:
headers[kv[0].strip()] = ''
return headers
class MockRequest(object):
def __init__(self, request_text):
if isinstance(request_text, bytes):
fields = re.split(b'\n\n', request_text, 1)
else:
fields = re.split('\n\n', request_text, 1)
head_fields = re.split('\n', oss2.to_string(fields[0]))
request_line_fields = head_fields[0].split()
uri_query_fields = request_line_fields[1].split('?')
if len(uri_query_fields) == 2:
self.params = query_to_params(uri_query_fields[1])
else:
self.params = {}
if len(fields) == 2:
self.body = oss2.to_bytes(fields[1])
else:
self.body = b''
self.method = request_line_fields[0]
self.headers = head_fields_to_headers(head_fields[1:])
self.url = 'http://' + self.headers['host'] + uri_query_fields[0]
class MockResponse2(object):
def __init__(self, response_text):
if isinstance(response_text, bytes):
fields = re.split(b'\n\n', response_text, 1)
else:
fields = re.split('\n\n', response_text, 1)
head_fields = re.split('\n', oss2.to_string(fields[0]))
response_line_fields = head_fields[0].split(' ', 2)
self.status = int(response_line_fields[1])
self.headers = head_fields_to_headers(head_fields[1:])
self.request_id = self.headers.get('x-oss-request-id', '')
if len(fields) == 2:
self.body = oss2.to_bytes(fields[1])
else:
self.body = b''
self.__io = io.BytesIO(self.body)
def read(self, amt=None):
return self.__io.read(amt)
def __iter__(self):
return self
def __next__(self):
return self.next()
def next(self):
return self.read(8192)
def _is_xml(content):
try:
minidom.parseString(content)
except xml.parsers.expat.ExpatError:
return False
else:
return True
class OssTestCase(unittest.TestCase):
def __init__(self, *args, **kwargs):
super(OssTestCase, self).__init__(*args, **kwargs)
self.default_connect_timeout = oss2.defaults.connect_timeout
self.default_request_retries = oss2.defaults.request_retries
self.default_multipart_threshold = oss2.defaults.multipart_threshold
self.default_part_size = oss2.defaults.part_size
def setUp(self):
oss2.defaults.connect_timeout = self.default_connect_timeout
oss2.defaults.request_retries = self.default_request_retries
oss2.defaults.multipart_threshold = self.default_multipart_threshold
oss2.defaults.part_size = self.default_part_size
self.previous = -1
self.temp_files = []
def tearDown(self):
for temp_file in self.temp_files:
os.remove(temp_file)
def tempname(self):
random_name = random_string(16)
self.temp_files.append(random_name)
return random_name
def make_tempfile(self, content):
fd, pathname = tempfile.mkstemp(suffix='test-upload')
os.write(fd, oss2.to_bytes(content))
os.close(fd)
self.temp_files.append(pathname)
return pathname
def progress_callback(self, bytes_consumed, total_bytes):
self.assertTrue(bytes_consumed <= total_bytes)
self.assertTrue(bytes_consumed > self.previous)
self.previous = bytes_consumed
def assertSortedListEqual(self, a, b, key=None):
self.assertEqual(sorted(a, key=key), sorted(b, key=key))
def assertXmlEqual(self, a, b):
a = a.translate(None, b'\r\n')
b = b.translate(None, b'\r\n')
normalized_a = minidom.parseString(oss2.to_bytes(a)).toxml(encoding='utf-8')
normalized_b = minidom.parseString(oss2.to_bytes(b)).toxml(encoding='utf-8')
self.assertEqual(normalized_a, normalized_b)
def assertUrlWithKey(self, url, key):
self.assertEqual('http://my-bucket.oss-cn-hangzhou.aliyuncs.com/' + key, url)
def assertRequest(self, req_info, request_text):
req = req_info.req
expected = MockRequest(request_text)
self.assertEqual(req.method, expected.method)
self.assertEqual(req.url, expected.url)
for k, v in expected.params.items():
self.assertTrue(k in req.params)
self.assertEqual(req.params[k], v)
if 'Content-Type' in expected.headers:
self.assertEqual(req.headers.get('Content-Type'), expected.headers['Content-Type'])
for k, v in expected.headers.items():
if k.startswith('x-oss-'):
self.assertEqual(req.headers.get(k), expected.headers[k])
if _is_xml(expected.body):
self.assertXmlEqual(req_info.data, expected.body)
else:
self.assertEqual(len(req_info.data), len(expected.body))
self.assertEqual(req_info.data, expected.body)