Xcode/AppleCookieDownloader.py (181 lines of code) (raw):

#!/usr/bin/python # # Copyright (c) Facebook, Inc. and its affiliates. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """See docstring for AppleCookieDownloader class""" import json import os.path import subprocess import time from autopkglib import Processor, ProcessorError __all__ = ["AppleCookieDownloader"] class AppleCookieDownloader(Processor): """Downloads a URL to the specified download_dir using curl.""" description = __doc__ input_variables = { "login_data": {"required": True, "description": "Path to login data file."}, "CURL_PATH": { "required": False, "default": "/usr/bin/curl", "description": "Path to curl binary. Defaults to /usr/bin/curl.", }, } output_variables = { "download_cookies": {"description": "Path to the download cookies."} } def download(self, url, curl_opts, output, request_headers, allow_failure=False): """Run a download with curl.""" # construct curl command. curl_cmd = [ self.env["CURL_PATH"], "--silent", "--show-error", "--no-buffer", "--fail", "--dump-header", "-", "--speed-time", "30", "--location", "--url", url, "--output", output, ] if request_headers: for header, value in request_headers.items(): curl_cmd.extend(["--header", "%s: %s" % (header, value)]) if curl_opts: for item in curl_opts: curl_cmd.extend([item]) # Open URL. proc = subprocess.Popen( curl_cmd, shell=False, bufsize=1, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) donewithheaders = False maxheaders = 15 header = {} header["http_result_code"] = "000" header["http_result_description"] = "" while True: if not donewithheaders: info = proc.stdout.readline().decode().strip("\r\n") if info.startswith("HTTP/"): try: header["http_result_code"] = info.split(None, 2)[1] header["http_result_description"] = info.split(None, 2)[2] except IndexError: pass elif ": " in info: # got a header line part = info.split(None, 1) fieldname = part[0].rstrip(":").lower() try: header[fieldname] = part[1] except IndexError: header[fieldname] = "" elif info == "": # we got an empty line; end of headers (or curl exited) if header.get("http_result_code") in [ "301", "302", "303", "307", "308", ]: # redirect, so more headers are coming. # Throw away the headers we've received so far header = {} header["http_result_code"] = "000" header["http_result_description"] = "" else: donewithheaders = True else: time.sleep(0.1) if proc.poll() is not None: # For small download files curl may exit before all headers # have been parsed, don't immediately exit. maxheaders -= 1 if donewithheaders or maxheaders <= 0: break retcode = proc.poll() if ( retcode and not allow_failure ): # Non-zero exit code from curl => problem with download curlerr = "" try: curlerr = proc.stderr.read().rstrip("\n") curlerr = curlerr.split(None, 2)[2] except IndexError: pass raise ProcessorError("Curl failure: %s (exit code %s)" % (curlerr, retcode)) def main(self): download_dir = os.path.join(self.env["RECIPE_CACHE_DIR"], "downloads") login_cookies = os.path.join(download_dir, "login_cookies") download_cookies = os.path.join(download_dir, "download_cookies") # create download_dir if needed if not os.path.exists(download_dir): try: os.makedirs(download_dir) except OSError as err: raise ProcessorError( "Can't create %s: %s" % (download_dir, err.strerror) ) self.output("Getting login cookie") # We need to POST a request to the auth page to get the # 'myacinfo' cookie login_curl_opts = [ "--request", "POST", "--data", "@{}".format(self.env["login_data"]), "--cookie-jar", login_cookies, ] self.download( url="https://idmsa.apple.com/IDMSWebAuth/authenticate", curl_opts=login_curl_opts, output="-", request_headers=None, allow_failure=True, ) self.output("Getting download cookie") # Now we need to get the download cookie dl_curl_opts = [ "--request", "POST", "--cookie", login_cookies, "--cookie-jar", download_cookies, ] headers = {"Content-length": "0"} output = os.path.join(download_dir, "listDownloads.gz") if os.path.exists(output): # Delete it first os.unlink(output) self.download( url="https://developer.apple.com/services-account/QH65B2/downloadws/listDownloads.action", curl_opts=dl_curl_opts, output=output, request_headers=headers, allow_failure=True, ) self.env["download_cookies"] = download_cookies try: with open(output) as f: json.load(f) # If we successfully load this as JSON, then we failed # to download the gzip list raise ProcessorError( "Unable to list downloads. Check your Apple credentials." ) except IOError: raise ProcessorError("Unable to load listDownloads.gz file.") except ValueError: pass # While we're at it, let's unzip the download list # It's actually a gunzip, so Unarchiver doesn't work # The result is a JSON blob self.output("Unzipping download list") os.chdir(download_dir) if os.path.exists(output.rstrip(".gz")): # Delete the file if it's already here os.unlink(output.rstrip(".gz")) gunzip_cmd = ["/usr/bin/gunzip", output] proc = subprocess.Popen( gunzip_cmd, shell=False, bufsize=1, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) (stdout, stderr) = proc.communicate() if proc.returncode: gzerr = stderr.rstrip("\n") raise ProcessorError( "Gunzip failure: %s (exit code %s)" % (gzerr, proc.returncode) ) if __name__ == "__main__": PROCESSOR = AppleCookieDownloader() PROCESSOR.execute_shell()