in api/batch_processing/postprocessing/postprocess_batch_results.py [0:0]
def process_batch_results(options: PostProcessingOptions
) -> PostProcessingResults:
ppresults = PostProcessingResults()
##%% Expand some options for convenience
output_dir = options.output_dir
##%% Prepare output dir
os.makedirs(output_dir, exist_ok=True)
##%% Load ground truth if available
ground_truth_indexed_db = None
if (options.ground_truth_json_file is not None) and (len(options.ground_truth_json_file) > 0):
if options.separate_detections_by_category:
print("Warning: I don't know how to separate categories yet when doing a P/R analysis, disabling category separation")
options.separate_detections_by_category = False
ground_truth_indexed_db = IndexedJsonDb(
options.ground_truth_json_file, b_normalize_paths=True,
filename_replacements=options.ground_truth_filename_replacements)
# Mark images in the ground truth as positive or negative
n_negative, n_positive, n_unknown, n_ambiguous = mark_detection_status(
ground_truth_indexed_db, negative_classes=options.negative_classes,
unknown_classes=options.unlabeled_classes)
print(f'Finished loading and indexing ground truth: {n_negative} '
f'negative, {n_positive} positive, {n_unknown} unknown, '
f'{n_ambiguous} ambiguous')
##%% Load detection (and possibly classification) results
if options.api_detection_results is None:
detections_df, other_fields = load_api_results(
options.api_output_file, normalize_paths=True,
filename_replacements=options.api_output_filename_replacements)
ppresults.api_detection_results = detections_df
ppresults.api_other_fields = other_fields
else:
print('Bypassing detection results loading...')
assert options.api_other_fields is not None
detections_df = options.api_detection_results
other_fields = options.api_other_fields
# Remove failed rows
n_failures = 0
if 'failure' in detections_df.columns:
n_failures = detections_df['failure'].count()
print('Warning: {} failed images'.format(n_failures))
detections_df = detections_df[detections_df['failure'].isna()]
assert other_fields is not None
detection_categories = other_fields['detection_categories']
# Convert keys and values to lowercase
classification_categories = other_fields.get('classification_categories', {})
classification_categories = {
k.lower(): v.lower()
for k, v in classification_categories.items()
}
# Add column 'pred_detection_label' to indicate predicted detection status,
# not separating out the classes
det_status = 'pred_detection_label'
if options.include_almost_detections:
detections_df[det_status] = DetectionStatus.DS_ALMOST
confidences = detections_df['max_detection_conf']
pos_mask = (confidences >= options.confidence_threshold)
detections_df.loc[pos_mask, det_status] = DetectionStatus.DS_POSITIVE
neg_mask = (confidences < options.almost_detection_confidence_threshold)
detections_df.loc[neg_mask, det_status] = DetectionStatus.DS_NEGATIVE
else:
detections_df[det_status] = np.where(
detections_df['max_detection_conf'] >= options.confidence_threshold,
DetectionStatus.DS_POSITIVE, DetectionStatus.DS_NEGATIVE)
n_positives = sum(detections_df[det_status] == DetectionStatus.DS_POSITIVE)
print(f'Finished loading and preprocessing {len(detections_df)} rows '
f'from detector output, predicted {n_positives} positives.')
if options.include_almost_detections:
n_almosts = sum(detections_df[det_status] == DetectionStatus.DS_ALMOST)
print('...and {} almost-positives'.format(n_almosts))
##%% If we have ground truth, remove images we can't match to ground truth
if ground_truth_indexed_db is not None:
b_match = detections_df['file'].isin(
ground_truth_indexed_db.filename_to_id)
print(f'Confirmed filename matches to ground truth for {sum(b_match)} '
f'of {len(detections_df)} files')
detections_df = detections_df[b_match]
detector_files = detections_df['file'].tolist()
assert len(detector_files) > 0, (
'No detection files available, possible path issue?')
print('Trimmed detection results to {} files'.format(len(detector_files)))
##%% Sample images for visualization
images_to_visualize = detections_df
if options.num_images_to_sample is not None and options.num_images_to_sample > 0:
images_to_visualize = images_to_visualize.sample(
n=min(options.num_images_to_sample, len(images_to_visualize)),
random_state=options.sample_seed)
output_html_file = ''
style_header = """<head>
<style type="text/css">
a { text-decoration: none; }
body { font-family: segoe ui, calibri, "trebuchet ms", verdana, arial, sans-serif; }
div.contentdiv { margin-left: 20px; }
</style>
</head>"""
##%% Fork here depending on whether or not ground truth is available
# If we have ground truth, we'll compute precision/recall and sample tp/fp/tn/fn.
#
# Otherwise we'll just visualize detections/non-detections.
if ground_truth_indexed_db is not None:
##%% Detection evaluation: compute precision/recall
# numpy array of detection probabilities
p_detection = detections_df['max_detection_conf'].values
n_detections = len(p_detection)
# numpy array of bools (0.0/1.0), and -1 as null value
gt_detections = np.zeros(n_detections, dtype=float)
for i_detection, fn in enumerate(detector_files):
image_id = ground_truth_indexed_db.filename_to_id[fn]
image = ground_truth_indexed_db.image_id_to_image[image_id]
detection_status = image['_detection_status']
if detection_status == DetectionStatus.DS_NEGATIVE:
gt_detections[i_detection] = 0.0
elif detection_status == DetectionStatus.DS_POSITIVE:
gt_detections[i_detection] = 1.0
else:
gt_detections[i_detection] = -1.0
# Don't include ambiguous/unknown ground truth in precision/recall analysis
b_valid_ground_truth = gt_detections >= 0.0
p_detection_pr = p_detection[b_valid_ground_truth]
gt_detections_pr = gt_detections[b_valid_ground_truth]
print('Including {} of {} values in p/r analysis'.format(np.sum(b_valid_ground_truth),
len(b_valid_ground_truth)))
precisions, recalls, thresholds = precision_recall_curve(gt_detections_pr, p_detection_pr)
# For completeness, include the result at a confidence threshold of 1.0
thresholds = np.append(thresholds, [1.0])
precisions_recalls = pd.DataFrame(data={
'confidence_threshold': thresholds,
'precision': precisions,
'recall': recalls
})
# Compute and print summary statistics
average_precision = average_precision_score(gt_detections_pr, p_detection_pr)
print('Average precision: {:.1%}'.format(average_precision))
# Thresholds go up throughout precisions/recalls/thresholds; find the last
# value where recall is at or above target. That's our precision @ target recall.
target_recall = 0.9
b_above_target_recall = np.where(recalls >= target_recall)
if not np.any(b_above_target_recall):
precision_at_target_recall = 0.0
else:
i_target_recall = np.argmax(b_above_target_recall)
precision_at_target_recall = precisions[i_target_recall]
print('Precision at {:.1%} recall: {:.1%}'.format(target_recall, precision_at_target_recall))
cm = confusion_matrix(gt_detections_pr, np.array(p_detection_pr) > options.confidence_threshold)
# Flatten the confusion matrix
tn, fp, fn, tp = cm.ravel()
precision_at_confidence_threshold = tp / (tp + fp)
recall_at_confidence_threshold = tp / (tp + fn)
f1 = 2.0 * (precision_at_confidence_threshold * recall_at_confidence_threshold) / \
(precision_at_confidence_threshold + recall_at_confidence_threshold)
print('At a confidence threshold of {:.1%}, precision={:.1%}, recall={:.1%}, f1={:.1%}'.format(
options.confidence_threshold, precision_at_confidence_threshold, recall_at_confidence_threshold, f1))
##%% Collect classification results, if they exist
classifier_accuracies = []
# Mapping of classnames to idx for the confusion matrix.
#
# The lambda is actually kind of a hack, because we use assume that
# the following code does not reassign classname_to_idx
classname_to_idx = collections.defaultdict(lambda: len(classname_to_idx))
# Confusion matrix as defaultdict of defaultdict
#
# Rows / first index is ground truth, columns / second index is predicted category
classifier_cm = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
# iDetection = 0; fn = detector_files[iDetection]; print(fn)
assert len(detector_files) == len(detections_df)
for iDetection, fn in enumerate(detector_files):
image_id = ground_truth_indexed_db.filename_to_id[fn]
image = ground_truth_indexed_db.image_id_to_image[image_id]
detections = detections_df['detections'].iloc[iDetection]
pred_class_ids = [det['classifications'][0][0] \
for det in detections if 'classifications' in det.keys()]
pred_classnames = [classification_categories[pd] for pd in pred_class_ids]
# If this image has classification predictions, and an unambiguous class
# annotated, and is a positive image...
if len(pred_classnames) > 0 \
and '_unambiguous_category' in image.keys() \
and image['_detection_status'] == DetectionStatus.DS_POSITIVE:
# The unambiguous category, we make this a set for easier handling afterward
gt_categories = set([image['_unambiguous_category']])
pred_categories = set(pred_classnames)
# Compute the accuracy as intersection of union,
# i.e. (# of categories in both prediciton and GT)
# divided by (# of categories in either prediction or GT
#
# In case of only one GT category, the result will be 1.0, if
# prediction is one category and this category matches GT
#
# It is 1.0/(# of predicted top-1 categories), if the GT is
# one of the predicted top-1 categories.
#
# It is 0.0, if none of the predicted categories is correct
classifier_accuracies.append(
len(gt_categories & pred_categories)
/ len(gt_categories | pred_categories)
)
image['_classification_accuracy'] = classifier_accuracies[-1]
# Distribute this accuracy across all predicted categories in the
# confusion matrix
assert len(gt_categories) == 1
gt_class_idx = classname_to_idx[list(gt_categories)[0]]
for pred_category in pred_categories:
pred_class_idx = classname_to_idx[pred_category]
classifier_cm[gt_class_idx][pred_class_idx] += 1
# ...for each file in the detection results
# If we have classification results
if len(classifier_accuracies) > 0:
# Build confusion matrix as array from classifier_cm
all_class_ids = sorted(classname_to_idx.values())
classifier_cm_array = np.array(
[[classifier_cm[r_idx][c_idx] for c_idx in all_class_ids] for r_idx in all_class_ids], dtype=float)
classifier_cm_array /= (classifier_cm_array.sum(axis=1, keepdims=True) + 1e-7)
# Print some statistics
print('Finished computation of {} classification results'.format(len(classifier_accuracies)))
print('Mean accuracy: {}'.format(np.mean(classifier_accuracies)))
# Prepare confusion matrix output
# Get confusion matrix as string
sio = io.StringIO()
np.savetxt(sio, classifier_cm_array * 100, fmt='%5.1f')
cm_str = sio.getvalue()
# Get fixed-size classname for each idx
idx_to_classname = {v:k for k,v in classname_to_idx.items()}
classname_list = [idx_to_classname[idx] for idx in sorted(classname_to_idx.values())]
classname_headers = ['{:<5}'.format(cname[:5]) for cname in classname_list]
# Prepend class name on each line and add to the top
cm_str_lines = [' ' * 16 + ' '.join(classname_headers)]
cm_str_lines += ['{:>15}'.format(cn[:15]) + ' ' + cm_line for cn, cm_line in zip(classname_list, cm_str.splitlines())]
# Print formatted confusion matrix
print('Confusion matrix: ')
print(*cm_str_lines, sep='\n')
# Plot confusion matrix
# To manually add more space at bottom: plt.rcParams['figure.subplot.bottom'] = 0.1
#
# Add 0.5 to figsize for every class. For two classes, this will result in
# fig = plt.figure(figsize=[4,4])
fig = plot_utils.plot_confusion_matrix(
classifier_cm_array,
classname_list,
normalize=False,
title='Confusion matrix',
cmap=plt.cm.Blues,
vmax=1.0,
use_colorbar=True,
y_label=True)
cm_figure_relative_filename = 'confusion_matrix.png'
cm_figure_filename = os.path.join(output_dir, cm_figure_relative_filename)
plt.savefig(cm_figure_filename)
plt.close(fig)
# ...if we have classification results
##%% Render output
# Write p/r table to .csv file in output directory
pr_table_filename = os.path.join(output_dir, 'prec_recall.csv')
precisions_recalls.to_csv(pr_table_filename, index=False)
# Write precision/recall plot to .png file in output directory
t = 'Precision-Recall curve: AP={:0.1%}, P@{:0.1%}={:0.1%}'.format(
average_precision, target_recall, precision_at_target_recall)
fig = plot_utils.plot_precision_recall_curve(precisions, recalls, t)
pr_figure_relative_filename = 'prec_recall.png'
pr_figure_filename = os.path.join(output_dir, pr_figure_relative_filename)
plt.savefig(pr_figure_filename)
# plt.show(block=False)
plt.close(fig)
##%% Sampling
# Sample true/false positives/negatives with correct/incorrect top-1
# classification and render to html
# Accumulate html image structs (in the format expected by write_html_image_lists)
# for each category, e.g. 'tp', 'fp', ..., 'class_bird', ...
images_html = collections.defaultdict(list)
# Add default entries by accessing them for the first time
[images_html[res] for res in ['tp', 'tpc', 'tpi', 'fp', 'tn', 'fn']] # Siyu: what does this do? This line should have no effect
for res in images_html.keys():
os.makedirs(os.path.join(output_dir, res), exist_ok=True)
image_count = len(images_to_visualize)
# Each element will be a list of 2-tuples, with elements [collection name,html info struct]
rendering_results = []
# Each element will be a three-tuple with elements file,max_conf,detections
files_to_render = []
# Assemble the information we need for rendering, so we can parallelize without
# dealing with Pandas
# i_row = 0; row = images_to_visualize.iloc[0]
for _, row in images_to_visualize.iterrows():
# Filenames should already have been normalized to either '/' or '\'
files_to_render.append([row['file'], row['max_detection_conf'], row['detections']])
def render_image_with_gt(file_info):
image_relative_path = file_info[0]
max_conf = file_info[1]
detections = file_info[2]
# This should already have been normalized to either '/' or '\'
image_id = ground_truth_indexed_db.filename_to_id.get(image_relative_path, None)
if image_id is None:
print('Warning: couldn''t find ground truth for image {}'.format(image_relative_path))
return None
image = ground_truth_indexed_db.image_id_to_image[image_id]
annotations = ground_truth_indexed_db.image_id_to_annotations[image_id]
gt_status = image['_detection_status']
gt_presence = bool(gt_status)
gt_classes = CameraTrapJsonUtils.annotations_to_classnames(
annotations, ground_truth_indexed_db.cat_id_to_name)
gt_class_summary = ','.join(gt_classes)
if gt_status > DetectionStatus.DS_MAX_DEFINITIVE_VALUE:
print(f'Skipping image {image_id}, does not have a definitive '
f'ground truth status (status: {gt_status}, classes: {gt_class_summary})')
return None
detected = max_conf > options.confidence_threshold
if gt_presence and detected:
if '_classification_accuracy' not in image.keys():
res = 'tp'
elif np.isclose(1, image['_classification_accuracy']):
res = 'tpc'
else:
res = 'tpi'
elif not gt_presence and detected:
res = 'fp'
elif gt_presence and not detected:
res = 'fn'
else:
res = 'tn'
display_name = '<b>Result type</b>: {}, <b>Presence</b>: {}, <b>Class</b>: {}, <b>Max conf</b>: {:0.3f}%, <b>Image</b>: {}'.format(
res.upper(), str(gt_presence), gt_class_summary,
max_conf * 100, image_relative_path)
rendered_image_html_info = render_bounding_boxes(
options.image_base_dir,
image_relative_path,
display_name,
detections,
res,
detection_categories,
classification_categories,
options)
image_result = None
if len(rendered_image_html_info) > 0:
image_result = [[res, rendered_image_html_info]]
for gt_class in gt_classes:
image_result.append(['class_{}'.format(gt_class), rendered_image_html_info])
return image_result
# ...def render_image_with_gt(file_info)
start_time = time.time()
if options.parallelize_rendering:
if options.parallelize_rendering_n_cores is None:
pool = ThreadPool()
else:
print('Rendering images with {} workers'.format(options.parallelize_rendering_n_cores))
pool = ThreadPool(options.parallelize_rendering_n_cores)
rendering_results = list(tqdm(pool.imap(render_image_with_gt, files_to_render), total=len(files_to_render)))
else:
# file_info = files_to_render[0]
for file_info in tqdm(files_to_render):
rendering_results.append(render_image_with_gt(file_info))
elapsed = time.time() - start_time
# Map all the rendering results in the list rendering_results into the
# dictionary images_html
image_rendered_count = 0
for rendering_result in rendering_results:
if rendering_result is None:
continue
image_rendered_count += 1
for assignment in rendering_result:
images_html[assignment[0]].append(assignment[1])
# Prepare the individual html image files
image_counts = prepare_html_subpages(images_html, output_dir)
print('{} images rendered (of {})'.format(image_rendered_count,image_count))
# Write index.html
all_tp_count = image_counts['tp'] + image_counts['tpc'] + image_counts['tpi']
total_count = all_tp_count + image_counts['tn'] + image_counts['fp'] + image_counts['fn']
classification_detection_results = """ <a href="tpc.html">with all correct top-1 predictions (TPC)</a> ({})<br/>