in lnt/util/ImportData.py [0:0]
def import_and_report(config, db_name, db, file, format, commit=False,
show_sample_count=False, disable_email=False,
disable_report=False):
"""
import_and_report(config, db_name, db, file, format,
[commit], [show_sample_count],
[disable_email]) -> ... object ...
Import a test data file into an LNT server and generate a test report. On
success, run is the newly imported run. Note that success is uneffected by
the value of commit, this merely changes whether the run (on success) is
committed to the database.
The result object is a dictionary containing information on the imported run
and its comparison to the previous run.
"""
numMachines = db.getNumMachines()
numRuns = db.getNumRuns()
numTests = db.getNumTests()
# If the database gets fragmented, count(*) in SQLite can get really slow!?!
if show_sample_count:
numSamples = db.getNumSamples()
result = {}
result['success'] = False
result['error'] = None
result['import_file'] = file
startTime = time.time()
try:
data = lnt.formats.read_any(file, format)
except KeyboardInterrupt:
raise
except:
import traceback
result['error'] = "load failure: %s" % traceback.format_exc()
return result
result['load_time'] = time.time() - startTime
# Auto-upgrade the data, if necessary.
lnt.testing.upgrade_report(data)
# Find the database config, if we have a configuration object.
if config:
db_config = config.databases[db_name]
else:
db_config = None
# Find the email address for this machine's results.
toAddress = email_config = None
if db_config and not disable_email:
email_config = db_config.email_config
if email_config.enabled:
# Find the machine name.
machineName = str(data.get('Machine',{}).get('Name'))
toAddress = email_config.get_to_address(machineName)
if toAddress is None:
result['error'] = ("unable to match machine name "
"for test results email address!")
return result
importStartTime = time.time()
try:
success, run = db.importDataFromDict(data, commit, config=db_config)
except KeyboardInterrupt:
raise
except:
raise
import traceback
result['error'] = "import failure: %s" % traceback.format_exc()
return result
# If the import succeeded, save the import path.
run.imported_from = file
result['import_time'] = time.time() - importStartTime
if not success:
# Record the original run this is a duplicate of.
result['original_run'] = run.id
reportStartTime = time.time()
result['report_to_address'] = toAddress
if config:
report_url = "%s/db_%s/" % (config.zorgURL, db_name)
else:
report_url = "localhost"
if not disable_report:
# This has the side effect of building the run report for
# this result.
NTEmailReport.emailReport(result, db, run, report_url,
email_config, toAddress, success, commit)
result['added_machines'] = db.getNumMachines() - numMachines
result['added_runs'] = db.getNumRuns() - numRuns
result['added_tests'] = db.getNumTests() - numTests
if show_sample_count:
result['added_samples'] = db.getNumSamples() - numSamples
result['committed'] = commit
result['run_id'] = run.id
ts_name = data['Run']['Info'].get('tag')
if commit:
db.commit()
if db_config:
# If we are not in a dummy instance, also run background jobs.
# We have to have a commit before we run, so subprocesses can
# see the submitted data.
ts = db.testsuite.get(ts_name)
async_ops.async_fieldchange_calc(db_name, ts, run, config)
else:
db.rollback()
# Add a handy relative link to the submitted run.
result['result_url'] = "db_{}/v4/{}/{}".format(db_name, ts_name, run.id)
result['report_time'] = time.time() - importStartTime
result['total_time'] = time.time() - startTime
note("Successfully created {}".format(result['result_url']))
# If this database has a shadow import configured, import the run into that
# database as well.
if config and config.databases[db_name].shadow_import:
# Load the shadow database to import into.
db_config = config.databases[db_name]
shadow_name = db_config.shadow_import
with closing(config.get_database(shadow_name)) as shadow_db:
if shadow_db is None:
raise ValueError, ("invalid configuration, shadow import "
"database %r does not exist") % shadow_name
# Perform the shadow import.
shadow_result = import_and_report(config, shadow_name,
shadow_db, file, format, commit,
show_sample_count, disable_email,
disable_report)
# Append the shadow result to the result.
result['shadow_result'] = shadow_result
result['success'] = True
return result