storage/signed_urls/generate_signed_urls.py (117 lines of code) (raw):
# Copyright 2018 Google, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This application demonstrates how to construct a Signed URL for objects in
Google Cloud Storage.
For more information, see the README.md under /storage and the documentation
at https://cloud.google.com/storage/docs/access-control/signing-urls-manually.
"""
import argparse
# [START storage_signed_url_all]
import binascii
import collections
import datetime
import hashlib
import sys
from urllib.parse import quote
# pip install google-auth
from google.oauth2 import service_account
# pip install six
import six
def generate_signed_url(
service_account_file,
bucket_name,
object_name,
subresource=None,
expiration=604800,
http_method="GET",
query_parameters=None,
headers=None,
):
if expiration > 604800:
print("Expiration Time can't be longer than 604800 seconds (7 days).")
sys.exit(1)
escaped_object_name = quote(six.ensure_binary(object_name), safe=b"/~")
canonical_uri = f"/{escaped_object_name}"
datetime_now = datetime.datetime.now(tz=datetime.timezone.utc)
request_timestamp = datetime_now.strftime("%Y%m%dT%H%M%SZ")
datestamp = datetime_now.strftime("%Y%m%d")
google_credentials = service_account.Credentials.from_service_account_file(
service_account_file
)
client_email = google_credentials.service_account_email
credential_scope = f"{datestamp}/auto/storage/goog4_request"
credential = f"{client_email}/{credential_scope}"
if headers is None:
headers = dict()
host = f"{bucket_name}.storage.googleapis.com"
headers["host"] = host
canonical_headers = ""
ordered_headers = collections.OrderedDict(sorted(headers.items()))
for k, v in ordered_headers.items():
lower_k = str(k).lower()
strip_v = str(v).lower()
canonical_headers += f"{lower_k}:{strip_v}\n"
signed_headers = ""
for k, _ in ordered_headers.items():
lower_k = str(k).lower()
signed_headers += f"{lower_k};"
signed_headers = signed_headers[:-1] # remove trailing ';'
if query_parameters is None:
query_parameters = dict()
query_parameters["X-Goog-Algorithm"] = "GOOG4-RSA-SHA256"
query_parameters["X-Goog-Credential"] = credential
query_parameters["X-Goog-Date"] = request_timestamp
query_parameters["X-Goog-Expires"] = expiration
query_parameters["X-Goog-SignedHeaders"] = signed_headers
if subresource:
query_parameters[subresource] = ""
canonical_query_string = ""
ordered_query_parameters = collections.OrderedDict(sorted(query_parameters.items()))
for k, v in ordered_query_parameters.items():
encoded_k = quote(str(k), safe="")
encoded_v = quote(str(v), safe="")
canonical_query_string += f"{encoded_k}={encoded_v}&"
canonical_query_string = canonical_query_string[:-1] # remove trailing '&'
canonical_request = "\n".join(
[
http_method,
canonical_uri,
canonical_query_string,
canonical_headers,
signed_headers,
"UNSIGNED-PAYLOAD",
]
)
canonical_request_hash = hashlib.sha256(canonical_request.encode()).hexdigest()
string_to_sign = "\n".join(
[
"GOOG4-RSA-SHA256",
request_timestamp,
credential_scope,
canonical_request_hash,
]
)
# signer.sign() signs using RSA-SHA256 with PKCS1v15 padding
signature = binascii.hexlify(
google_credentials.signer.sign(string_to_sign)
).decode()
scheme_and_host = "{}://{}".format("https", host)
signed_url = "{}{}?{}&x-goog-signature={}".format(
scheme_and_host, canonical_uri, canonical_query_string, signature
)
return signed_url
# [END storage_signed_url_all]
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
"service_account_file", help="Path to your Google service account keyfile."
)
parser.add_argument("request_method", help="A request method, e.g GET, POST.")
parser.add_argument("bucket_name", help="Your Cloud Storage bucket name.")
parser.add_argument("object_name", help="Your Cloud Storage object name.")
parser.add_argument("expiration", type=int, help="Expiration time.")
parser.add_argument(
"--subresource",
default=None,
help='Subresource of the specified resource, e.g. "acl".',
)
args = parser.parse_args()
signed_url = generate_signed_url(
service_account_file=args.service_account_file,
http_method=args.request_method,
bucket_name=args.bucket_name,
object_name=args.object_name,
subresource=args.subresource,
expiration=int(args.expiration),
)
print(signed_url)