in migration/utils/emr_helper.py [0:0]
def sign_request(method, service, host, region, canonical_uri, target, raw_data):
# Sign the request using SigV4
access_key, secret_key, session_token = obtain_credential()
# Create a datetime object for signing
t = datetime.datetime.now(datetime.UTC)
amzdate = t.strftime('%Y%m%dT%H%M%SZ')
datestamp = t.strftime('%Y%m%d')
# Create the canonical request
canonical_querystring = ''
payload_hash = hashlib.sha256(raw_data.encode('utf-8')).hexdigest()
canonical_headers = ('content-type:' + 'application/x-amz-json-1.1' + '\n'
+ 'host:' + host + '\n' \
+ 'x-amz-date:' + amzdate + '\n'
+ 'x-amz-target:' + target + '\n')
signed_headers = 'content-type;host;x-amz-date;x-amz-target'
canonical_request = (method + '\n' + canonical_uri + '\n' + canonical_querystring + '\n'
+ canonical_headers + '\n' + signed_headers + '\n' + payload_hash)
# Create the string to sign
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = datestamp + '/' + region + '/' + service + '/' + 'aws4_request'
string_to_sign = (algorithm + '\n' + amzdate + '\n' + credential_scope + '\n' +
hashlib.sha256(canonical_request.encode('utf-8')).hexdigest())
# Sign the string
signing_key = getSignatureKey(secret_key, datestamp, region, service)
signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'), hashlib.sha256).hexdigest()
# Add signing information to the request
authorization_header = (algorithm + ' ' + 'Credential=' + access_key + '/' + credential_scope + ', ' +
'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature)
headers = {
'Authorization': authorization_header,
'Content-Type': 'application/x-amz-json-1.1',
'X-Amz-Date': amzdate,
'X-Amz-Target': target
}
# If session_token is not None, add it to the headers
if session_token is not None:
headers['X-Amz-Security-Token'] = session_token
else:
print("Session token is None")
return headers