in DataScience/LogDownloader.py [0:0]
def download_container(app_id, log_dir, container=None, conn_string=None, account_name=None, sas_token=None, start_date=None, end_date=None, overwrite_mode=0, dry_run=False, version=2, verbose=False, create_gzip_mode=-1, delta_mod_t=3600, max_connections=4, confirm=False, report_progress=True, if_match=None, keep_invalid_eof=False, max_download_size=None):
t_start = time.time()
if not container:
container=app_id
print('------'*10)
print('Current UTC time: {}'.format(datetime.datetime.now(datetime.timezone.utc)))
print('app_id: {}'.format(app_id))
print('container: {}'.format(container))
print('log_dir: {}'.format(log_dir))
print('Start Date: {}'.format(start_date))
print('End Date: {}'.format(end_date))
print('Overwrite mode: {}'.format(overwrite_mode))
print('dry_run: {}'.format(dry_run))
print('version: {}'.format(version))
print('create_gzip_mode: {}'.format(create_gzip_mode))
print('------'*10)
if not dry_run:
os.makedirs(os.path.join(log_dir, app_id), exist_ok=True)
output_fp = None
if version == 1: # using C# api for uncooked logs
output_fp = os.path.join(log_dir, app_id, app_id+'_'+start_date.strftime("%Y-%m-%d")+'_'+end_date.strftime("%Y-%m-%d")+'.json')
print('Destination: {}'.format(output_fp))
do_download = True
if os.path.isfile(output_fp):
if overwrite_mode in {0, 3, 4}:
print('Output file already exits. Not downloading'.format(output_fp))
do_download = False
elif overwrite_mode == 1 and input('Output file already exits. Do you want to overwrite [Y/n]? '.format(output_fp)) not in {'Y', 'y'}:
do_download = False
if do_download:
if dry_run:
print('--dry_run - Not downloading!')
else:
print('Downloading...', end='')
try:
import requests
LogDownloaderURL = "https://cps-staging-exp-experimentation.azurewebsites.net/api/Log?account={ACCOUNT_NAME}&key={ACCOUNT_KEY}&start={START_DATE}&end={END_DATE}&container={CONTAINER}"
conn_string_dict = dict(x.split('=',1) for x in conn_string.split(';'))
if not conn_string_dict['AccountName'] or len(conn_string_dict['AccountKey']) != 88:
print("Error: Invalid Azure Storage ConnectionString.")
sys.exit()
url = LogDownloaderURL.format(ACCOUNT_NAME=conn_string_dict['AccountName'], ACCOUNT_KEY=conn_string_dict['AccountKey'].replace('+','%2b'), CONTAINER=container, START_DATE=start_date.strftime("%Y-%m-%d"), END_DATE=(end_date+datetime.timedelta(days=1)).strftime("%Y-%m-%d"))
r = requests.post(url)
open(output_fp, 'wb').write(r.content)
print(' Done!\n')
except Exception as e:
print('Error: {}'.format(e))
else: # using BlockBlobService python api for cooked logs
try:
print('Establishing Azure Storage BlockBlobService connection using ',end='')
if sas_token and account_name:
print('sas token...')
bbs = BlockBlobService(account_name=account_name, sas_token=sas_token)
else:
print('connection string...')
bbs = BlockBlobService(connection_string=conn_string)
# List all blobs and download them one by one
print('Getting blobs list...')
blobs = bbs.list_blobs(container)
except Exception as e:
if type(e.args[0]) == str and e.args[0].startswith('The specified container does not exist.'):
print("Error: The specified container ({}) does not exist.".format(container))
else:
print("Error:\nType: {}\nArgs: {}".format(type(e).__name__, e.args))
sys.exit()
print('Iterating through blobs...\n')
selected_fps = []
for blob in blobs:
if '/data/' not in blob.name:
if verbose:
print('{} - Skip: Non-data blob\n'.format(blob.name))
continue
blob_day = datetime.datetime.strptime(blob.name.split('/data/', 1)[1].split('_', 1)[0], '%Y/%m/%d')
if (start_date and blob_day < start_date) or (end_date and end_date < blob_day):
if verbose:
print('{} - Skip: Outside of date range\n'.format(blob.name))
continue
try:
bp = bbs.get_blob_properties(container, blob.name)
if confirm:
if input("{} - Do you want to download [Y/n]? ".format(blob.name)) not in {'Y', 'y'}:
print()
continue
fp = os.path.join(log_dir, app_id, blob.name.replace('/','_'))
selected_fps.append(fp)
if os.path.isfile(fp):
file_size = os.path.getsize(fp)
if overwrite_mode == 0:
if verbose:
print('{} - Skip: Output file already exits\n'.format(blob.name))
continue
elif overwrite_mode in {1, 3, 4}:
if file_size == bp.properties.content_length: # file size is the same, skip!
if verbose:
print('{} - Skip: Output file already exits with same size\n'.format(blob.name))
continue
print('Output file already exits: {}\nLocal size: {:.3f} MB\nAzure size: {:.3f} MB'.format(fp, file_size/(1024**2), bp.properties.content_length/(1024**2)))
if overwrite_mode in {3, 4} and file_size > bp.properties.content_length: # local file size is larger, skip with warning!
print('{} - Skip: Output file already exits with larger size\n'.format(blob.name))
continue
if overwrite_mode == 1 and input("Do you want to overwrite [Y/n]? ") not in {'Y', 'y'}:
print()
continue
else:
file_size = None
print('Processing: {} (size: {:.3f}MB - Last modified: {})'.format(blob.name, bp.properties.content_length/(1024**2), bp.properties.last_modified))
# check if blob was modified in the last delta_mod_t sec
if datetime.datetime.now(datetime.timezone.utc)-bp.properties.last_modified < datetime.timedelta(0, delta_mod_t):
if overwrite_mode < 2:
if input("Azure blob currently in use (modified in the last delta_mod_t={} sec). Do you want to download anyway [Y/n]? ".format(delta_mod_t)) not in {'Y', 'y'}:
print()
continue
elif overwrite_mode == 4:
print('Azure blob currently in use (modified in the last delta_mod_t={} sec). Skipping!\n'.format(delta_mod_t))
continue
if if_match != '*': # when if_match is not '*', reset max_connections to 1 to prevent crash if azure blob is modified during download
max_connections = 1
if dry_run:
print('--dry_run - Not downloading!')
else:
t0 = time.time()
process_checker = update_progress if report_progress == True else None
if overwrite_mode in {3, 4} and file_size:
if max_download_size is None or file_size < max_download_size:
print('Check validity of remote file... ', end='')
temp_fp = fp + '.temp'
cmpsize = min(file_size,8*1024**2)
bbs.get_blob_to_path(container, blob.name, temp_fp, max_connections=max_connections, start_range=file_size-cmpsize, end_range=file_size-1, if_match=if_match)
if cmp_files(fp, temp_fp, -cmpsize):
print('Valid!')
print('Resume downloading to temp file with max_connections = {}...'.format(max_connections))
bbs.get_blob_to_path(container, blob.name, temp_fp, progress_callback=process_checker, max_connections=max_connections, start_range=os.path.getsize(fp), if_match=if_match, end_range=max_download_size)
download_time = time.time()-t0
download_size_MB = os.path.getsize(temp_fp)/(1024**2) # file size in MB
print('\nAppending to local file...')
with open(fp, 'ab') as f1, open(temp_fp, 'rb') as f2:
shutil.copyfileobj(f2, f1, length=100*1024**2) # writing chunks of 100MB to avoid consuming memory
print('Appending completed. Deleting temp file...')
os.remove(temp_fp)
else:
os.remove(temp_fp)
print('Invalid! - Skip\n')
continue
print('Downloaded {:.3f} MB in {:.1f} sec. ({:.3f} MB/sec) - Total elapsed time: {:.1f} sec.'.format(download_size_MB, download_time, download_size_MB/download_time, time.time()-t0))
else:
print('Downloading with max_connections = {}...'.format(max_connections))
bbs.get_blob_to_path(container, blob.name, fp, progress_callback=process_checker, max_connections=max_connections, if_match=if_match, start_range=0, end_range=max_download_size)
download_time = time.time()-t0
download_size_MB = os.path.getsize(fp)/(1024**2) # file size in MB
print('\nDownloaded {:.3f} MB in {:.1f} sec. ({:.3f} MB/sec)'.format(download_size_MB, download_time, download_size_MB/download_time))
if not keep_invalid_eof:
erase_invalid_end_line(fp)
print()
except Exception as e:
print('Error: {}'.format(e))
if create_gzip_mode > -1:
if selected_fps:
selected_fps = [x for x in selected_fps if os.path.isfile(x)]
if create_gzip_mode == 0:
models = {}
for fp in selected_fps:
models.setdefault(os.path.basename(fp).split('_data_',1)[0], []).append(fp)
for model in models:
models[model].sort(key=lambda x : list(map(int,x.split('_data_')[1].split('_')[:3])))
start_date = '-'.join(models[model][0].split('_data_')[1].split('_')[:3])
end_date = '-'.join(models[model][-1].split('_data_')[1].split('_')[:3])
output_fp = os.path.join(log_dir, app_id, app_id+'_'+model+'_data_'+start_date+'_'+end_date+'.json.gz')
print('Concat and zip files of LastConfigurationEditDate={} to: {}'.format(model, output_fp))
if os.path.isfile(output_fp) and __name__ == '__main__' and input('Output file already exits. Do you want to overwrite [Y/n]? '.format(output_fp)) not in {'Y', 'y'}:
continue
if dry_run:
print('--dry_run - Not downloading!')
else:
with gzip.open(output_fp, 'wb') as f_out:
for fp in models[model]:
print('Adding: {}'.format(fp))
with open(fp, 'rb') as f_in:
shutil.copyfileobj(f_in, f_out, length=100*1024**2) # writing chunks of 100MB to avoid consuming memory
elif create_gzip_mode == 1:
selected_fps.sort(key=lambda x : (list(map(int,x.split('_data_')[1].split('_')[:3])), -os.path.getsize(x), x))
selected_fps_merged = []
last_fp_date = None
for fp in selected_fps:
fp_date = datetime.datetime.strptime('_'.join(fp.split('_data_')[1].split('_')[:3]), "%Y_%m_%d")
if fp_date != last_fp_date:
selected_fps_merged.append(fp)
last_fp_date = fp_date
start_date = '-'.join(selected_fps_merged[0].split('_data_')[1].split('_')[:3])
end_date = '-'.join(selected_fps_merged[-1].split('_data_')[1].split('_')[:3])
output_fp = os.path.join(log_dir, app_id, app_id+'_merged_data_'+start_date+'_'+end_date+'.json.gz')
print('Merge and zip files of all LastConfigurationEditDate to: {}'.format(output_fp))
if not os.path.isfile(output_fp) or __name__ == '__main__' and input('Output file already exits. Do you want to overwrite [Y/n]? '.format(output_fp)) in {'Y', 'y'}:
if dry_run:
for fp in selected_fps_merged:
print('Adding: {}'.format(fp))
print('--dry_run - Not downloading!')
else:
with gzip.open(output_fp, 'wb') as f_out:
for fp in selected_fps_merged:
print('Adding: {}'.format(fp))
with open(fp, 'rb') as f_in:
shutil.copyfileobj(f_in, f_out, length=1024**3) # writing chunks of 1GB to avoid consuming memory
elif create_gzip_mode == 2:
selected_fps.sort(key=lambda x : (list(map(int,x.split('_data_')[1].split('_')[:3])), -os.path.getsize(x), x))
start_date = '-'.join(selected_fps[0].split('_data_')[1].split('_')[:3])
end_date = '-'.join(selected_fps[-1].split('_data_')[1].split('_')[:3])
output_fp = os.path.join(log_dir, app_id, app_id+'_deepmerged_data_'+start_date+'_'+end_date+'.json.gz')
print('Merge, unique, sort, and zip files of all LastConfigurationEditDate to: {}'.format(output_fp))
if not os.path.isfile(output_fp) or __name__ == '__main__' and input('Output file already exits. Do you want to overwrite [Y/n]? '.format(output_fp)) in {'Y', 'y'}:
d = {}
for fn in selected_fps:
print('Parsing: {}'.format(fn), end='', flush=True)
if not dry_run:
for x in open(fn, 'rb'):
if x.startswith(b'{"_label_cost') and x.strip().endswith(b'}'): # reading only cooked lined
data = ds_parse.json_cooked(x)
if data is not None and (data['ei'] not in d or float(data['cost']) < d[data['ei']][1]): # taking line with best reward
d[data['ei']] = (data['ts'], float(data['cost']), x)
print(' - len(d): {}'.format(len(d)))
print('Writing to output .gz file...')
if dry_run:
print('--dry_run - Not downloading!')
else:
with gzip.open(output_fp, 'wb') as f:
i = 0
for x in sorted(d.values(), key=lambda x : x[0]): # events are sorted by timestamp
f.write(x[2])
i += 1
if i % 5000 == 0:
update_progress(i, len(d))
update_progress(i, len(d))
print()
else:
print('Unrecognized --create_gzip_mode: {}, skipping creating gzip files.'.format(create_gzip_mode))
else:
print('No file downloaded, skipping creating gzip files.')
print('Total elapsed time: {:.1f} sec.\n'.format(time.time()-t_start))
return output_fp