legacy/adobe_tools/adobe_api.py (440 lines of code) (raw):

#!/usr/bin/python # Copyright (c) Facebook, Inc. and its affiliates. """Module to interact with the Adobe User Management API.""" from __future__ import print_function import json import os import platform import random import sys import time try: import jwt import requests except ImportError: print("Missing 'jwt' and/or 'requests' modules.") exit(1) if sys.version_info[0] == 2: from ConfigParser import RawConfigParser from urllib import urlencode from urllib import quote elif sys.version_info[0] >= 3: from configparser import RawConfigParser from urllib.parse import urlencode # Constants for fallback USERCONFIG_DEFAULT_LOC = '/Library/Adobe/usermanagement.config' PRIVATE_KEY_DEFAULT_LOC = '/Library/Adobe/private.key' CACHE_DEFAULT_LOC = '/Library/Adobe/adobe_tools.json' # User lookup functions def get_console_user(): """Find out who is logged in right now.""" current_os = platform.system() if 'Darwin' in current_os: # macOS: Use SystemConfiguration framework to get the current # console user from SystemConfiguration import SCDynamicStoreCopyConsoleUser cfuser = SCDynamicStoreCopyConsoleUser(None, None, None) return cfuser[0] if 'Windows' in current_os: from win32api import GetUserName return GetUserName() if 'Linux' in current_os: from getpass import getuser return getuser() # Exception classes used by this module. class AdobeAPINoUserException(Exception): """Given user does not exist.""" def __init__(self, username): """Store the user that doesn't exist.""" self.username = username def __str__(self): """String for the username.""" return "No user found for '%s' " % str(self.username) class AdobeAPINoProductException(Exception): """Given product does not exist.""" def __init__(self, product): """Store the product that doesn't exist.""" self.product = product def __str__(self): """String for the product.""" return "No product configuration for '%s'" % str(self.product) class AdobeAPIBadStatusException(Exception): """Received a non-200 code from the API.""" def __init__(self, status_code, headers, text): """Store the product that doesn't exist.""" self.status_code = status_code self.headers = headers self.text = text def __str__(self): """Text for the error.""" return 'Status code %s: %s' % (self.status_code, str(self.text)) def __int__(self): """Return status code of the error.""" return int(self.status_code) class AdobeAPIIncompleteUserActionException(Exception): """User manipulation action returned an incomplete.""" def __init__(self, errors): """Store the error generated from the incomplete.""" self.errors = errors def __str__(self): """Text for the error.""" return str(self.errors) class AdobeAPIMissingRequirementsException(Exception): """Missing a required file for API usage.""" def __init__(self, filename): """Store the filename that is missing.""" self.filename = filename def __str__(self): """Text for the error.""" return 'Required file is missing: %s' % str(self.filename) class AdobeAPIObject(object): """Model to represent an Adobe API interface.""" def __init__( self, username="%s@fb.com" % get_console_user(), private_key_filename=PRIVATE_KEY_DEFAULT_LOC, userconfig=USERCONFIG_DEFAULT_LOC, cache_path=CACHE_DEFAULT_LOC, cache=True, key='email', allow_nonexistent_user=False, splay=random.randrange(-144, 144), ): """ Instantiate class variables for our API object model. 'username' defaults to the current logged in user on all platforms. 'private_key_filename', 'userconfig', and 'cache_path' will default to the constants defined above if not provided. 'cache' defaults to True to consume available cache data, and to store the data in local cache. False will not cache and ignores any local cache file. The cache path is defined in the constant above. 'key' must be either 'email' or 'username', and determines what field to match the incoming data off of. By default, this is the 'email' field. 'allow_nonexistent_user' will not trigger an exception if you try to perform an action on a user that does not exist. This is useful for determining if a user exists, or querying lists of product configs, where you don't actually need to interact with a user to do so. 'splay' is a number of hours added to the cache length. By default, this is a random value between -144 and 144 hours, so that machines don't all invalidate their cache and query the API endpoint at the same time. This can be confusing because regardless of key choice, 'username' is used to indicate the unique user. """ self.configs = {} self.productlist = [] self.userlist = [] self.cache_path = cache_path self.user = {} self.username = username self.cache = cache self.key = key self.allow_fake = allow_nonexistent_user self.splay = splay if self.cache: self.__read_cache() # Generate the access configs in case we need them later self.__generate_config( userconfig=userconfig, private_key_filename=private_key_filename ) if not self.user: # Cache didn't have values we need, so let's query the API self.gather_user() if not self.productlist: self.gather_product_list(force=True) if self.cache: self.__write_cache() # CONFIG def __get_private_key(self, priv_key_filename): """Retrieve private key from file.""" priv_key_file = open(priv_key_filename) priv_key = priv_key_file.read() priv_key_file.close() return priv_key def __get_user_config(self, filename=None): """Retrieve config data from file.""" config = RawConfigParser() config.read(filename) config_dict = { # server parameters 'host': config.get("server", "host"), 'endpoint': config.get("server", "endpoint"), 'ims_host': config.get("server", "ims_host"), 'ims_endpoint_jwt': config.get("server", "ims_endpoint_jwt"), # enterprise parameters used to construct JWT 'domain': config.get("enterprise", "domain"), 'org_id': config.get("enterprise", "org_id"), 'api_key': config.get("enterprise", "api_key"), 'client_secret': config.get("enterprise", "client_secret"), 'tech_acct': config.get("enterprise", "tech_acct"), 'priv_key_filename': config.get("enterprise", "priv_key_filename"), } self.configs = config_dict def __prepare_jwt_token(self): """Construct the JSON Web Token for auth.""" # set expiry time for JSON Web Token expiry_time = int(time.time()) + 60 * 60 * 24 # create payload payload = { "exp": expiry_time, "iss": self.configs['org_id'], "sub": self.configs['tech_acct'], "aud": ( "https://" + self.configs['ims_host'] + "/c/" + self.configs['api_key'] ), ( "https://" + self.configs['ims_host'] + "/s/" + "ent_user_sdk" ): True } # create JSON Web Token jwt_token = jwt.encode(payload, self.priv_key, algorithm='RS256') # decode bytes into string jwt_token = jwt_token.decode("utf-8") return jwt_token def __prepare_access_token(self, config_data, jwt_token): """Generate the access token.""" # Method parameters url = "https://" + config_data['ims_host'] + \ config_data['ims_endpoint_jwt'] headers = { "Content-Type": "application/x-www-form-urlencoded", "Cache-Control": "no-cache" } body_credentials = { "client_id": config_data['api_key'], "client_secret": config_data['client_secret'], "jwt_token": jwt_token } body = urlencode(body_credentials) # send http request res = requests.post(url, headers=headers, data=body) # evaluate response if res.status_code == 200: # extract token access_token = json.loads(res.text)["access_token"] return access_token else: raise AdobeAPIBadStatusException( res.status_code, res.headers, res.text ) def __generate_config(self, userconfig, private_key_filename): """Return tuple of necessary config data.""" # Get userconfig data user_config_path = userconfig if not os.path.isfile(str(user_config_path)): raise AdobeAPIMissingRequirementsException(str(user_config_path)) # Get private key priv_key_path = private_key_filename if not os.path.isfile(str(priv_key_path)): raise AdobeAPIMissingRequirementsException(str(priv_key_path)) self.priv_key = self.__get_private_key(priv_key_path) # Get config data self.__get_user_config(user_config_path) # Get the JWT try: self.jwt_token = self.__prepare_jwt_token() except NotImplementedError: print( "Cryptography module was unable to succeed on your machine.", file=sys.stderr) raise # Get the access token self.access_token = self.__prepare_access_token( self.configs, self.jwt_token ) def __headers(self, config_data, access_token): """Return the headers needed.""" headers = { "Content-type": "application/json", "Accept": "application/json", "x-api-key": config_data['api_key'], "Authorization": "Bearer " + access_token } return headers # REQUEST INTERACTION FUNCTIONS def __submit_request(self, url): """ Submit a request to the API endpoint. Returns a JSON dictionary of the result. If a non-200 status is returned, raise an AdobeAPIBadStatusException. """ res = requests.get( url, headers=self.__headers(self.configs, self.access_token) ) if res.status_code != 200: raise AdobeAPIBadStatusException( res.status_code, res.headers, res.text ) return json.loads(res.text) def _submit_user_action_request(self, body_dict): """ Submit a JSON request to the User Action API. Returns True if the action succeeded. If the action was not completed, raise AdobeAPIIncompleteUserActionException. """ success = False body = json.dumps([body_dict]) url = "https://" + self.configs['host'] + \ self.configs['endpoint'] + "/action/" + \ self.configs['org_id'] res = requests.post( url, headers=self.__headers(self.configs, self.access_token), data=body ) if res.status_code != 200: raise AdobeAPIBadStatusException( res.status_code, res.headers, res.text ) results = json.loads(res.text) if results.get('notCompleted') == 1: raise AdobeAPIIncompleteUserActionException( results.get('errors') ) if results.get('completed') == 1: success = True self.update_user() return success # CACHE FUNCTIONS def __read_cache(self): """Read the values from the cache file.""" cache_data = {} try: # Invalidate the cache automatically after 2 weeks, plus splay file_age = os.path.getmtime(self.cache_path) # Splay is a number of hours added to the cache invalidation time # It can be negative, so that clients don't all hit at once. splay_seconds = 60 * 60 * int(self.splay) two_weeks = (60 * 60 * 24 * 14) if time.time() - file_age < (two_weeks + splay_seconds): with open(self.cache_path, 'rb') as f: cache_data = json.load(f) except (OSError, IOError, ValueError): # Cache doesn't exist, or is invalid self.user = {} return productlist = cache_data.get('productlist', []) if productlist: self.productlist = productlist userlist = cache_data.get('userlist', []) if userlist: self.userlist = userlist user_data = cache_data.get('user_data', {}) if user_data and user_data.get(self.key) == self.username: self.user = user_data else: # Look through the userlist to see if we find the username. # If not, the result is an empty dict anyway. self.user = self.data() def __write_cache(self): """Write the values to the cache file.""" cache_data = {} cache_data['productlist'] = self.productlist or [] cache_data['userlist'] = self.userlist or [] cache_data['user_data'] = self.user or {} try: with open(self.cache_path, 'wb') as f: json.dump(cache_data, f, indent=True, sort_keys=True) except IOError: # If we fail to write cache, it just means we check again next time pass # GATHERING DATA FROM THE API # These functions all must query the API (directly or indirectly) for info # not available from the cache, and are therefore expensive. def gather_product_list(self, force=False): """ Get the list of product configurations by asking the API. Returns 'productlist', which is a list of dictionaries containing all the Configuration groups in use. If 'force' is true, the API call will be made regardless of cache. If a non-200 status code is returned by the API, an exception is raised. Example: ``` >>>> api.productlist[0] {u'memberCount': 182, u'groupName': u'Administrators'} >>> api.productlist[1] {u'memberCount': 912, u'groupName': u'Default Document Cloud for enterprise - Pro Configuration'} ``` """ if force or not self.productlist: page = 0 result = {} productlist = [] while result.get('lastPage', False) is not True: url = "https://" + self.configs['host'] + \ self.configs['endpoint'] + "/groups/" + \ self.configs['org_id'] + "/" + str(page) try: result = self.__submit_request(url) productlist += result.get('groups', []) page += 1 except AdobeAPIBadStatusException: raise self.productlist = productlist # Update the cache if self.cache: self.__write_cache() return self.productlist def gather_user_list(self, force=False): """ Get a list of all users by querying the API. Returns 'userlist', which is a list of dictionaries containing all the users in our org. If 'force' is true, the API call will be made regardless of cache. If a non-200 status code is returned by the API, an exception is raised. Example: ``` >>> api.userlist[0] {u'status': u'active', u'username': u'email@fb.com', u'domain': u'fb.com', u'firstname': u'Fake Firstname', u'lastname': u'Fake Lastname', u'groups': [ u'Default Document Cloud for enterprise - Pro Configuration', u'Default All Apps plan - 100 GB Configuration', u'Default Illustrator CC - 0 GB Configuration', u'Default InDesign CC - 0 GB Configuration', u'Default Photoshop CC - 0 GB Configuration'], u'country': u'US', u'type': u'federatedID', u'email': u'email@fb.com'} """ if force or not self.userlist: page = 0 result = {} userlist = [] while result.get('lastPage', False) is not True: url = "https://" + self.configs['host'] + \ self.configs['endpoint'] + "/users/" + \ self.configs['org_id'] + "/" + str(page) try: result = self.__submit_request(url) userlist += result.get('users', []) page += 1 except AdobeAPIBadStatusException: raise self.userlist = userlist # Update the cache if self.cache: self.__write_cache() return self.userlist def users_of_product(self, product_config_name): """ Get a list of users of a specific configuration by querying the API. 'userlist' is a list of dictionaries containing the user data of each user who is a member of that product configuration group. If a non-200 status code is returned by the API, an exception is raised. Example: ``` >>> api.users_of_product( 'Default Document Cloud for enterprise - Pro Configuration')[0] {u'status': u'active', u'username': u'email@fb.com', u'domain': u'fb.com', u'firstname': u'Fake', u'lastname': u'Fake', u'country': u'US', u'type': u'federatedID', u'email': u'email@fb.com'} ``` This data is not cached, so it is an expensive call each time. """ page = 0 result = {} userlist = [] while result.get('lastPage', False) is not True: url = "https://" + self.configs['host'] + \ self.configs['endpoint'] + "/users/" + \ self.configs['org_id'] + "/" + str(page) + "/" + \ quote(product_config_name) try: result = self.__submit_request(url) userlist += result.get('users', []) page += 1 except AdobeAPIBadStatusException as e: error = json.loads(e.text) if 'group.not_found' in error['result']: # Invalid product name raise AdobeAPINoProductException(product_config_name) else: raise return userlist def data(self): """Get the data for the user from the userlist.""" for user in self.userlist: if user[self.key] == self.username: return user # If we get here, there was no matching username return {} def gather_user(self): """ Gather data about the user by querying the API. Returns a dictionary containing the user data. If a non-200 status code is returned by the API, an exception is raised. This data is cached, but this function does not read from the cache; it will always fetch from the API. If the user does not exist and 'allow_nonexistent_user' was not set to True, this raises an AdobeAPINoUserException. """ url = "https://" + self.configs['host'] + \ self.configs['endpoint'] + "/organizations/" + \ self.configs['org_id'] + "/users/" + str(self.username) try: result = self.__submit_request(url) self.user = result.get('user', {}) except AdobeAPIBadStatusException: if self.allow_fake: self.user = {} return raise AdobeAPINoUserException(self.username) # USER SPECIFIC FUNCTIONS # These convenience functions are all based on the user that the object was # instantiated with. def list_products(self): """Return the list of products for the current user.""" return self.user.get('groups', []) def is_federated(self): """Return True if user is federated.""" return self.user.get('type') == 'federatedID' def has_product(self, product_name): """Return True if user has the product config.""" return product_name in self.list_products() def update_user(self): """Force update the user information.""" # Rebuild the userlist for updated information self.gather_user() if self.cache: self.__write_cache() # PRODUCT SPECIFIC FUNCTIONS # These are not at all related to the user, and do not require a real user. def product_exists(self, productname): """Return True if a product config exists.""" if not self.productlist: self.gather_product_list() for product in self.productlist: if productname == product.get('groupName', ''): return True return False # ACTION FUNCTIONS # These functions are actions you can take on the user, which require # posting data to the API. def add_federated_user(self, email, country, firstname, lastname): """Add Federated user to organization.""" add_dict = { 'user': self.username, 'do': [ { 'createFederatedID': { 'email': email, 'country': country, 'firstname': firstname, 'lastname': lastname, } } ] } result = self._submit_user_action_request(add_dict) return result def update_user_information(self, email, country, firstname, lastname): """Update the existing user's information.""" add_dict = { 'user': self.username, 'do': [ { 'update': { } } ] } if email: add_dict['do'][0]['update']['email'] = email if country: add_dict['do'][0]['update']['country'] = country if firstname: add_dict['do'][0]['update']['firstname'] = firstname if lastname: add_dict['do'][0]['update']['lastname'] = lastname result = self._submit_user_action_request(add_dict) return result def remove_user_from_org(self): """Remove user from organization.""" if not self.user: raise AdobeAPINoUserException(self.username) remove_dict = { 'user': self.username, 'do': [ { 'removeFromOrg': {} } ] } result = self._submit_user_action_request(remove_dict) return result def add_products_to_user(self, products): """Add product configs to username.""" # Is username in the organization? if not self.user: raise AdobeAPINoUserException(self.username) # Is the product real? if isinstance(products, basestring): # NOQA products = [products] for product in products: if not self.product_exists(product): raise AdobeAPINoProductException(product) add_dict = { 'user': self.username, 'do': [ { 'add': { 'product': products } } ] } return self._submit_user_action_request(add_dict) def remove_product_from_user(self, products): """Remove products from username.""" # Is username in the organization? if not self.user: raise AdobeAPINoUserException(self.username) if isinstance(products, basestring): # NOQA products = [products] # Is the product real? for product in products: if not self.product_exists(product): raise AdobeAPINoProductException(product) add_dict = { 'user': self.username, 'do': [ { 'remove': { 'product': products } } ] } return self._submit_user_action_request(add_dict) # END CLASS