in src/cadet.py [0:0]
def upload(source, type_, collection_name, database_name, primary_key, uri, connection_string):
"""
Given a source file `source` of type `type_`:
1. connects to the Cosmos DB instance using either
(a) `primary_key` and `uri`
OR
(b) `connection_string`
2. ...and uploads the data to the collection `collection_name` on database `database_name
Assumes that the Cosmos DB subscription has both the database and the collection already
made when running this tool.
"""
# Make sure it's a CSV or TSV
type_ = type_.upper()
if type_ not in DELIMITERS or not source.upper().endswith(FILE_ENDINGS):
raise click.BadParameter('We currently only support CSV and TSV uploads from Cadet')
# You must have either the connection string OR (endpoint and key) to connect
if (uri is None or primary_key is None) and (connection_string is None):
raise click.BadParameter(
'REQUIRED: Connection string OR *both* a URI and a key'
)
elif uri is not None and primary_key is not None:
_connection_url = uri
_auth = {'masterKey': primary_key}
elif connection_string is not None:
connection_str = connection_string.split(';')
_connection_url = connection_str[0].replace('AccountEndpoint=', '')
try:
# If someone provides the connection string, break it apart into its subcomponents
if 'AccountEndpoint=' not in connection_string or 'AccountKey=' not in connection_string:
raise click.BadParameter('The connection string is not properly formatted.')
connection_str = connection_string.split(';')
_connection_url = connection_str[0].replace('AccountEndpoint=', '')
_auth = {'masterKey': connection_str[1].replace('AccountKey=', '')}
except:
# ...Unless they don't provide a usable connection string
raise click.BadParameter('The connection string is not properly formatted.')
database_link = 'dbs/' + database_name
collection_link = database_link + '/colls/' + collection_name
# Connect to Cosmos
try:
client = get_cosmos_client(_connection_url, _auth)
except:
raise click.BadParameter('Authentication failure to Azure Cosmos')
# Read and upload at same time
try:
source_path = get_full_source_path(source)
read_and_upload(source_path, type_, client, collection_link)
except FileNotFoundError as err:
raise click.FileError(source, hint=err)