pantri/scripts/lib/fb_objectstore.py (189 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
import os
import json
import getpass
import platform
import sys
# Third party modules
import swiftclient
import swiftclient.service
import config
import utils
import logger
class FB_ObjectStore(object):
"""
Class for dealing with SwiftStack Object Store.
"""
def __init__(self, options):
"""
__init__(self):
Instantiate class variables
"""
# TODO reseach decorators for logging and configs
if not options:
options = config.get_options('default', {})
self.options = options
self.logger = logger.get_logger()
self.paths = utils.get_paths()
self.git_path = self.paths['repo_root']
# TODO learn more about this fucntions and add logic on enter to create
# connection to object store.
# Magic functions to use with statements
def __enter__(self):
self.auth_token = self.get_auth_token()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
return self
def get_file_creds(self):
password_file = os.path.join(self.git_path, '.pantri_password')
if os.path.exists(password_file):
try:
creds = json.loads(utils.read_file(password_file))
return creds
except ValueError:
self.logger.error('Unable to parse password file.')
sys.exit(1)
else:
raise IOError(
'Cannot find .pantri_password. Unable to proceed.')
def prompt_for_creds(self):
if 'password_file' in self.options.keys():
return False
return True
def get_auth_creds(self):
"""
return username, password to request auth token. If method is 'retrieve'
and -p/--password_file arg is passed, user cred from .pantri_password.
Else prompt user for creds
"""
# Handling non-interactive actions.
prompt_for_creds = self.prompt_for_creds()
if prompt_for_creds:
self.logger.info('Enter Credentials to Pull Auth Token')
username = utils.get_username()
password = getpass.getpass()
return username, password
else:
creds = self.get_file_creds()
return creds['username'], creds['password']
def get_cached_auth_token(self):
""" Return auth_token cached in .pantri_auth_token """
# Return cached auth token.
auth_token_cache = os.path.join(self.git_path, '.pantri_auth_token')
if os.path.exists(auth_token_cache):
try:
auth_token = json.loads(utils.read_file(auth_token_cache))['auth_token']
return auth_token
except:
pass
return None
def validate_auth_token(self, auth_token):
""" Validate auth_token by listing container. """
try:
# Exception will be thrown if auth token is invalid
swiftclient.client.head_container(
url=self.options['storage_url'],
token=auth_token,
container=self.options['object_store_container'],
)
return True
except:
self.logger.debug('Auth token is invalid.')
return False
def request_auth_token(self):
""" Using username & password, request auth_token from Swift """
# Get username/password
username, password = self.get_auth_creds()
try:
storage_url, auth_token = swiftclient.client.get_auth(
self.options['auth_url'],
username,
password
)
return auth_token
except Exception as error:
self.logger.debug('request_auth_token error: %s' % error)
self.logger.error('Failed to get auth token. Try again')
sys.exit(1)
def cache_auth_token(self, auth_token):
""" cache auth_token to .pantri_auth_token """
# Cache auth token
auth_token_cache = os.path.join(self.git_path, '.pantri_auth_token')
utils.write_json_file(auth_token_cache, {'auth_token': auth_token})
self.logger.info('Auth token stored in %s' % auth_token_cache)
def get_auth_token(self):
""" Return auth token either from cache or request from object store """
auth_token_cache = os.path.join(self.git_path, '.pantri_auth_token')
# only use cached token if method is 'store' and 'password_file' arg is not
# passed
use_cached_auth_token = (
self.options['method'] == 'store' or not
'password_file' in self.options.keys()
)
if os.path.exists(auth_token_cache) and use_cached_auth_token:
auth_token = self.get_cached_auth_token()
# Validate and return cached auth token.
if auth_token and self.validate_auth_token(auth_token):
return auth_token
else:
self.logger.info('Auth token is invalid. Requesting a new token...')
auth_token = self.request_auth_token()
if use_cached_auth_token:
self.cache_auth_token(auth_token)
return auth_token
def parse_response(self, response):
"""
parse_response(self, response)
Parses response to determine if action was seccessful and logs messages to
stdout.
"""
# Try/Except will catch wrong response values. ie, empty dict or string
try:
# Exclude 'create_container' failures since current setup doesnt give
# users rights to create container. Not an issue unless container
# doesnt exist
if response['action'] == 'create_container':
self.logger.debug('SKIP: (action: create_container)')
return False
if response['success']:
success_msg = 'SUCCESS'
action = response['action']
if (
action == 'upload_object' and
response['status'] == 'skipped-changed'
):
action = response['status']
success_msg = 'NOTHING'
message = '%s (action: %s) Object: %s' % (
success_msg,
action,
response['object'],
)
self.logger.info(message)
return True
if not response['success']:
message = ' FAILURE (action: %s) Object: %s Traceback: %s' % (
response['action'],
response['object'],
response['traceback'],
)
self.logger.warn(message)
return False
except:
self.logger.warn('FAILURE: Unknown response from Swift')
return False
def upload(self, objects_to_upload):
"""
upload(self, objects)
Uploads list of objects to object store. Unchanged files (filesize &
modified time) are skipped. Relaies on swiftclient.service.SwiftService
class to do a majority of the work to upload objects.
"""
objects = []
for obj in objects_to_upload:
objects.append(
swiftclient.service.SwiftUploadObject(
source=os.path.join(self.paths['shelves'], obj),
object_name=obj
)
)
# Create connection to object store and upload objects
with swiftclient.service.SwiftService(
options={
'os_storage_url': self.options['storage_url'],
'os_auth_token': self.auth_token
}
) as swift:
# need for loop to read all responses returned. ie generators
for response in swift.upload(
container=self.options['object_store_container'],
objects=objects,
options={
'changed': True,
'segment_size': 4294967296,
'use_slo': True,
'segment_container': 'segments',
}
):
# Yield successful uploads
if self.parse_response(response):
yield response['object']
def delete_untested(self, objects_to_delete):
"""
delete(self, objects_to_delete)
Deletes objects from object store. Not meant to be used until tested more.
"""
# Create connection to object store and delete objects
with swiftclient.service.SwiftService(
options={
'os_storage_url': self.options['storage_url'],
'os_auth_token': self.auth_token
}
) as swift:
for response in swift.delete(
container=self.options['object_store_container'],
objects=objects_to_delete,
):
self.parse_response(response)
def download(self, objects_to_sync):
"""
download(self, objects_to_sync)
Downloads list of objects from object store
"""
objects = []
for obj in objects_to_sync:
objects.append(obj)
# Create connection to object store and download objects
with swiftclient.service.SwiftService(
options={
'os_storage_url': self.options['storage_url'],
'os_auth_token': self.auth_token
}
) as swift:
for response in swift.download(
container=self.options['object_store_container'],
objects=objects,
options={'out_directory': self.options['dest_sync']}
):
self.parse_response(response)