storage/signed_urls/generate_signed_urls.py (117 lines of code) (raw):

# Copyright 2018 Google, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """This application demonstrates how to construct a Signed URL for objects in Google Cloud Storage. For more information, see the README.md under /storage and the documentation at https://cloud.google.com/storage/docs/access-control/signing-urls-manually. """ import argparse # [START storage_signed_url_all] import binascii import collections import datetime import hashlib import sys from urllib.parse import quote # pip install google-auth from google.oauth2 import service_account # pip install six import six def generate_signed_url( service_account_file, bucket_name, object_name, subresource=None, expiration=604800, http_method="GET", query_parameters=None, headers=None, ): if expiration > 604800: print("Expiration Time can't be longer than 604800 seconds (7 days).") sys.exit(1) escaped_object_name = quote(six.ensure_binary(object_name), safe=b"/~") canonical_uri = f"/{escaped_object_name}" datetime_now = datetime.datetime.now(tz=datetime.timezone.utc) request_timestamp = datetime_now.strftime("%Y%m%dT%H%M%SZ") datestamp = datetime_now.strftime("%Y%m%d") google_credentials = service_account.Credentials.from_service_account_file( service_account_file ) client_email = google_credentials.service_account_email credential_scope = f"{datestamp}/auto/storage/goog4_request" credential = f"{client_email}/{credential_scope}" if headers is None: headers = dict() host = f"{bucket_name}.storage.googleapis.com" headers["host"] = host canonical_headers = "" ordered_headers = collections.OrderedDict(sorted(headers.items())) for k, v in ordered_headers.items(): lower_k = str(k).lower() strip_v = str(v).lower() canonical_headers += f"{lower_k}:{strip_v}\n" signed_headers = "" for k, _ in ordered_headers.items(): lower_k = str(k).lower() signed_headers += f"{lower_k};" signed_headers = signed_headers[:-1] # remove trailing ';' if query_parameters is None: query_parameters = dict() query_parameters["X-Goog-Algorithm"] = "GOOG4-RSA-SHA256" query_parameters["X-Goog-Credential"] = credential query_parameters["X-Goog-Date"] = request_timestamp query_parameters["X-Goog-Expires"] = expiration query_parameters["X-Goog-SignedHeaders"] = signed_headers if subresource: query_parameters[subresource] = "" canonical_query_string = "" ordered_query_parameters = collections.OrderedDict(sorted(query_parameters.items())) for k, v in ordered_query_parameters.items(): encoded_k = quote(str(k), safe="") encoded_v = quote(str(v), safe="") canonical_query_string += f"{encoded_k}={encoded_v}&" canonical_query_string = canonical_query_string[:-1] # remove trailing '&' canonical_request = "\n".join( [ http_method, canonical_uri, canonical_query_string, canonical_headers, signed_headers, "UNSIGNED-PAYLOAD", ] ) canonical_request_hash = hashlib.sha256(canonical_request.encode()).hexdigest() string_to_sign = "\n".join( [ "GOOG4-RSA-SHA256", request_timestamp, credential_scope, canonical_request_hash, ] ) # signer.sign() signs using RSA-SHA256 with PKCS1v15 padding signature = binascii.hexlify( google_credentials.signer.sign(string_to_sign) ).decode() scheme_and_host = "{}://{}".format("https", host) signed_url = "{}{}?{}&x-goog-signature={}".format( scheme_and_host, canonical_uri, canonical_query_string, signature ) return signed_url # [END storage_signed_url_all] if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument( "service_account_file", help="Path to your Google service account keyfile." ) parser.add_argument("request_method", help="A request method, e.g GET, POST.") parser.add_argument("bucket_name", help="Your Cloud Storage bucket name.") parser.add_argument("object_name", help="Your Cloud Storage object name.") parser.add_argument("expiration", type=int, help="Expiration time.") parser.add_argument( "--subresource", default=None, help='Subresource of the specified resource, e.g. "acl".', ) args = parser.parse_args() signed_url = generate_signed_url( service_account_file=args.service_account_file, http_method=args.request_method, bucket_name=args.bucket_name, object_name=args.object_name, subresource=args.subresource, expiration=int(args.expiration), ) print(signed_url)