in alibabacloud_oss_v2/_client.py [0:0]
def build_request_context(self, op_input: OperationInput, options: _Options, inner: _InnerOptions
) -> SigningContext:
"""build request context
"""
# host & path
url = _build_url(op_input, options)
# queries
if op_input.parameters is not None:
query = urlencode(op_input.parameters, quote_via=quote)
if len(query) > 0:
url = url + "?" + query
# build http request
request = HttpRequest(method=op_input.method, url=url)
# headers
request.headers.update(op_input.headers or {})
request.headers.update({'User-Agent': inner.user_agent})
# body
body = op_input.body or b''
# body tracker
if op_input.op_metadata is not None:
tracker = op_input.op_metadata.get("opm-request-body-tracker", None)
if tracker is not None:
writers = []
for t in tracker:
if hasattr(t, 'write'):
writers.append(t)
if len(writers) > 0:
body = io_utils.TeeIterator.from_source(body, writers)
request.body = body
# signing context
context = SigningContext(
product=options.product,
region=options.region,
bucket=op_input.bucket,
key=op_input.key,
request=request,
)
if utils.safety_str(options.auth_method) == 'query':
context.auth_method_query = True
oss_date = request.headers.get('x-oss-date', None)
if oss_date is not None:
context.signing_time = serde.deserialize_httptime(oss_date)
if (expiration_time := op_input.op_metadata.get('expiration_time', None)) is not None:
context.expiration_time = expiration_time
context.sub_resource = op_input.op_metadata.get("sub-resource", [])
return context