"""
postprocess_batch_results.py
Given a .json or .csv file containing MD results, do one or more of the following:
* Sample detections/non-detections and render to HTML (when ground truth isn't
available) (this is 99.9% of what this module is for)
* Evaluate detector precision/recall, optionally rendering results (requires
ground truth)
* Sample true/false positives/negatives and render to HTML (requires ground
truth)
Ground truth, if available, must be in COCO Camera Traps format:
https://github.com/agentmorris/MegaDetector/blob/main/megadetector/data_management/README.md#coco-camera-traps-format
"""
#%% Constants and imports
import argparse
import collections
import copy
import errno
import io
import os
import sys
import time
import uuid
import warnings
import random
from enum import IntEnum
from multiprocessing.pool import ThreadPool
from multiprocessing.pool import Pool
from functools import partial
from collections import defaultdict
import matplotlib.pyplot as plt
import numpy as np
import humanfriendly
import pandas as pd
from sklearn.metrics import precision_recall_curve, confusion_matrix, average_precision_score
from tqdm import tqdm
from megadetector.visualization import visualization_utils as vis_utils
from megadetector.visualization import plot_utils
from megadetector.utils.write_html_image_list import write_html_image_list
from megadetector.utils.wi_taxonomy_utils import load_md_or_speciesnet_file
from megadetector.utils import path_utils
from megadetector.utils.ct_utils import args_to_object
from megadetector.utils.ct_utils import sets_overlap
from megadetector.utils.ct_utils import sort_dictionary_by_value
from megadetector.utils.ct_utils import sort_dictionary_by_key
from megadetector.utils.ct_utils import invert_dictionary
from megadetector.utils.path_utils import clean_filename
from megadetector.data_management.cct_json_utils import CameraTrapJsonUtils
from megadetector.data_management.cct_json_utils import IndexedJsonDb
from megadetector.postprocessing.load_api_results import load_api_results
from megadetector.detection.run_detector import get_typical_confidence_threshold_from_results
warnings.filterwarnings('ignore', '(Possibly )?corrupt EXIF data', UserWarning)
#%% Options
DEFAULT_NEGATIVE_CLASSES = ['empty']
DEFAULT_UNKNOWN_CLASSES = ['unknown', 'unlabeled', 'ambiguous']
# Make sure there is no overlap between the two sets, because this will cause
# issues in the code
assert not sets_overlap(DEFAULT_NEGATIVE_CLASSES, DEFAULT_UNKNOWN_CLASSES), (
'Default negative and unknown classes cannot overlap.')
[docs]
class PostProcessingOptions:
"""
Options used to parameterize process_batch_results().
"""
def __init__(self):
### Required inputs
#: MD results .json file to process
self.md_results_file = ''
#: Folder to which we should write HTML output
self.output_dir = ''
### Options
#: Folder where images live (filenames in [md_results_file] should be relative to this folder)
#:
#: Can be '' if [md_results_file] uses absolute paths.
self.image_base_dir = ''
## These apply only when we're doing ground-truth comparisons
#: Optional .json file containing ground truth information
self.ground_truth_json_file = ''
#: List of classes we'll treat as negative (defaults to "empty", typically includes
#: classes like "blank", "misfire", etc.).
#:
#: Include the token "#NO_LABELS#" to indicate that an image with no annotations
#: should be considered empty.
self.negative_classes = DEFAULT_NEGATIVE_CLASSES
#: List of classes we'll treat as neither positive nor negative (defaults to
#: "unknown", typically includes classes like "unidentifiable").
self.unlabeled_classes = DEFAULT_UNKNOWN_CLASSES
#: List of output sets that we should count, but not render images for.
#:
#: Typically used to preview sets with lots of empties, where you don't want to
#: subset but also don't want to render 100,000 empty images.
#:
#: Example strings that are valid for this option:
#:
#: detections, non_detections
#: detections_animal, detections_person, detections_vehicle
#: tn, tp, fn, fp
self.rendering_bypass_sets = []
#: If this is None, a confidence threshold is selected based on the detector version.
#:
#: This can either be a float or a dictionary mapping category names (not IDs) to
#: thresholds. The category "default" can be used to specify thresholds for
#: other categories. Currently the use of a dict here is not supported when
#: ground truth is supplied.
self.confidence_threshold = None
#: Confidence threshold to apply to classification (not detection) results
#:
#: Only a float is supported here (unlike the "confidence_threshold" parameter, which
#: can be a dict).
self.classification_confidence_threshold = 0.5
#: Used for summary statistics only
self.target_recall = 0.9
#: Number of images to sample, -1 for "all images"
self.num_images_to_sample = 500
#: Random seed for sampling, or None
self.sample_seed = 0 # None
#: Image width for images in the HTML output
self.viz_target_width = 800
#: Line width (in pixels) for rendering detections
self.line_thickness = 4
#: Box expansion (in pixels) for rendering detections
self.box_expansion = 0
#: Job name to include in big letters in the output HTML
self.job_name_string = None
#: Model version string to include in the output HTML
self.model_version_string = None
#: Sort order for the output, should be one of "filename", "confidence", or "random"
self.html_sort_order = 'filename'
#: If True, images in the output HTML will be links back to the original images
self.link_images_to_originals = True
#: Optionally separate detections into categories (animal/vehicle/human)
#:
#: Currently only supported when ground truth is unavailable
self.separate_detections_by_category = True
#: Optionally replace one or more strings in filenames with other strings;
#: useful for taking a set of results generated for one folder structure
#: and applying them to a slightly different folder structure.
self.api_output_filename_replacements = {}
#: Optionally replace one or more strings in filenames with other strings;
#: useful for taking a set of results generated for one folder structure
#: and applying them to a slightly different folder structure.
self.ground_truth_filename_replacements = {}
#: Allow bypassing API output loading when operating on previously-loaded
#: results. If present, this is a Pandas DataFrame. Almost never useful.
self.api_detection_results = None
#: Allow bypassing API output loading when operating on previously-loaded
#: results. If present, this is a str --> obj dict. Almost never useful.
self.api_other_fields = None
#: Should we also split out a separate report about the detections that were
#: just below our main confidence threshold?
#:
#: Currently only supported when ground truth is unavailable.
self.include_almost_detections = False
#: Only a float is supported here (unlike the "confidence_threshold" parameter, which
#: can be a dict).
self.almost_detection_confidence_threshold = None
#: Enable/disable rendering parallelization
self.parallelize_rendering = False
#: Number of threads/processes to use for rendering parallelization
self.parallelize_rendering_n_cores = 16
#: Whether to use threads (True) or processes (False) for rendering parallelization
self.parallelize_rendering_with_threads = True
#: When classification results are present, should be sort alphabetically by class
#: name (False) or in descending order by frequency (True)?
self.sort_classification_results_by_count = False
#: When classification results are present, use this dictionary to push some
#: categories to the bottom of the list. Larger numbers == later groups.
#: Default sort weight is zero. Line breaks will separate equal sort weights.
#: Sort weights must be integers.
#:
#: In practice this is used to push generic categories like "blank", "animal",
#: and "unreliable" to the bottom of the list, like:
#:
#: options.category_name_to_sort_weight = \
#: {'animal':1,'blank':1,'unknown':1,'unreliable':1,'mammal':1,'no cv result':1}
self.category_name_to_sort_weight = {}
#: Should we split individual pages up into smaller pages if there are more than
#: N images?
self.max_figures_per_html_file = None
#: Footer text for the index page
# self.footer_text = \
# '<br/><p style="font-size:80%;">Preview page created with the ' + \
# <a href="{}">MegaDetector Python package</a>.</p>'.\
# format('https://megadetector.readthedocs.io')
self.footer_text = ''
#: Character encoding to use when writing the index HTML html
self.output_html_encoding = None
#: Additional image fields to display in image headers. If this is a list,
#: we'll include those fields; if this is a dict, we'll use that dict to choose
#: alternative display names for each field.
self.additional_image_fields_to_display = None
#: If classification results are present, should we include a summary of
#: classification categories?
self.include_classification_category_report = True
#: The category/count summary typically only includes category names, this
#: flag includes descriptions (typically taxonomic strings) as well.
self.include_category_descriptions_with_global_counts = False
#: Display the min/max normalized size of above-threshold detections for each
#: image
self.include_size_range = False
# ...__init__()
# ...PostProcessingOptions
[docs]
class PostProcessingResults:
"""
Return format from process_batch_results
"""
def __init__(self):
#: HTML file to which preview information was written
self.output_html_file = ''
#: Pandas Dataframe containing detection results
self.api_detection_results = None
#: str --> obj dictionary containing other information loaded from the results file
self.api_other_fields = None
##%% Helper classes and functions
class DetectionStatus(IntEnum):
"""
Flags used to mark images as positive or negative for P/R analysis
(according to ground truth and/or detector output)
:meta private:
"""
DS_NEGATIVE = 0
DS_POSITIVE = 1
# Anything greater than this isn't clearly positive or negative
DS_MAX_DEFINITIVE_VALUE = DS_POSITIVE
# image has annotations suggesting both negative and positive
DS_AMBIGUOUS = 2
# image is not annotated or is annotated with 'unknown', 'unlabeled', ETC.
DS_UNKNOWN = 3
# image has not yet been assigned a state
DS_UNASSIGNED = 4
# In some analyses, we add an additional class that lets us look at
# detections just below our main confidence threshold
DS_ALMOST = 5
def _mark_detection_status(indexed_db,
negative_classes=DEFAULT_NEGATIVE_CLASSES,
unknown_classes=DEFAULT_UNKNOWN_CLASSES):
"""
For each image in indexed_db.db['images'], add a '_detection_status' field
to indicate whether to treat this image as positive, negative, ambiguous,
or unknown.
Makes modifications in-place.
returns (n_negative, n_positive, n_unknown, n_ambiguous)
"""
negative_classes = set(negative_classes)
unknown_classes = set(unknown_classes)
# count the # of images with each type of DetectionStatus
n_unknown = 0
n_ambiguous = 0
n_positive = 0
n_negative = 0
print('Preparing ground-truth annotations')
for im in tqdm(indexed_db.db['images']):
image_id = im['id']
annotations = indexed_db.image_id_to_annotations[image_id]
categories = [ann['category_id'] for ann in annotations]
category_names = set(indexed_db.cat_id_to_name[cat] for cat in categories)
# Check whether this image has:
# - unknown / unassigned-type labels
# - negative-type labels
# - positive labels (i.e., labels that are neither unknown nor negative)
has_unknown_labels = sets_overlap(category_names, unknown_classes)
has_negative_labels = sets_overlap(category_names, negative_classes)
has_positive_labels = 0 < len(category_names - (unknown_classes | negative_classes))
# assert has_unknown_labels is False, '{} has unknown labels'.format(annotations)
# If there are no image annotations...
if len(categories) == 0:
if '#NO_LABELS#' in negative_classes:
n_negative += 1
im['_detection_status'] = DetectionStatus.DS_NEGATIVE
else:
n_unknown += 1
im['_detection_status'] = DetectionStatus.DS_UNKNOWN
# n_negative += 1
# im['_detection_status'] = DetectionStatus.DS_NEGATIVE
# If the image has more than one type of labels, it's ambiguous
# note: bools are automatically converted to 0/1, so we can sum
elif (has_unknown_labels + has_negative_labels + has_positive_labels) > 1:
n_ambiguous += 1
im['_detection_status'] = DetectionStatus.DS_AMBIGUOUS
# After the check above, we can be sure it's only one of positive,
# negative, or unknown.
#
# Important: do not merge the following 'unknown' branch with the first
# 'unknown' branch above, where we tested 'if len(categories) == 0'
#
# If the image has only unknown labels
elif has_unknown_labels:
n_unknown += 1
im['_detection_status'] = DetectionStatus.DS_UNKNOWN
# If the image has only negative labels
elif has_negative_labels:
n_negative += 1
im['_detection_status'] = DetectionStatus.DS_NEGATIVE
# If the images has only positive labels
elif has_positive_labels:
n_positive += 1
im['_detection_status'] = DetectionStatus.DS_POSITIVE
# Annotate the category, if it is unambiguous
if len(category_names) == 1:
im['_unambiguous_category'] = list(category_names)[0]
else:
raise Exception('Invalid detection state')
# ...for each image
return n_negative, n_positive, n_unknown, n_ambiguous
# ..._mark_detection_status()
def is_sas_url(s) -> bool:
"""
Placeholder for a more robust way to verify that a link is a SAS URL.
99.999% of the time this will suffice for what we're using it for right now.
:meta private:
"""
return (s.startswith(('http://', 'https://')) and ('core.windows.net' in s)
and ('?' in s))
def relative_sas_url(folder_url, relative_path):
"""
Given a container-level or folder-level SAS URL, create a SAS URL to the
specified relative path.
:meta private:
"""
relative_path = relative_path.replace('%','%25')
relative_path = relative_path.replace('#','%23')
relative_path = relative_path.replace(' ','%20')
if not is_sas_url(folder_url):
return None
tokens = folder_url.split('?')
assert len(tokens) == 2
if not tokens[0].endswith('/'):
tokens[0] = tokens[0] + '/'
if relative_path.startswith('/'):
relative_path = relative_path[1:]
return tokens[0] + relative_path + '?' + tokens[1]
def _render_bounding_boxes(
image_base_dir,
image_relative_path,
display_name,
detections,
category_string,
ground_truth_boxes=None,
detection_categories=None,
classification_categories=None,
options=None):
"""
Renders detection bounding boxes on a single image.
This is an internal function; if you want tools for rendering boxes on images, see
visualization.visualization_utils.
The source image is:
image_base_dir / image_relative_path
The target image is, for example:
[options.output_dir] /
['detections' or 'non_detections'] /
[filename with slashes turned into tildes]
"category_string" is a result type, e.g. "detections", "non-detections", "class_coyote"; this
determines the output folder for the rendered image.
Only very preliminary support is provided for ground truth box rendering.
Returns the html info struct for this image in the format that's used for
write_html_image_list.
:meta private:
"""
if options is None:
options = PostProcessingOptions()
image_full_path = None
if category_string in options.rendering_bypass_sets:
sample_name = category_string + '_' + path_utils.flatten_path(image_relative_path)
else:
if is_sas_url(image_base_dir):
image_full_path = relative_sas_url(image_base_dir, image_relative_path)
else:
image_full_path = os.path.join(image_base_dir, image_relative_path)
# os.path.isfile() is slow when mounting remote directories; much faster
# to just try/except on the image open.
try:
image = vis_utils.open_image(image_full_path)
except Exception as e:
print('Warning: could not open image file {}: {}'.format(image_full_path,str(e)))
image = None
# return ''
# Render images to a flat folder
sample_name = category_string + '_' + path_utils.flatten_path(image_relative_path)
fullpath = os.path.join(options.output_dir, category_string, sample_name)
if image is not None:
original_size = image.size
# Resize the image if necessary
if options.viz_target_width is not None:
image = vis_utils.resize_image(image, options.viz_target_width)
# Render ground truth boxes if necessary
if ground_truth_boxes is not None and len(ground_truth_boxes) > 0:
# Create class labels like "gt_1" or "gt_27"
gt_classes = [0] * len(ground_truth_boxes)
label_map = {0:'ground truth'}
# for i_box,box in enumerate(ground_truth_boxes):
# gt_classes.append('_' + str(box[-1]))
vis_utils.render_db_bounding_boxes(ground_truth_boxes, gt_classes, image,
original_size=original_size,label_map=label_map,
thickness=4,expansion=4)
# Prepare per-category confidence thresholds
if isinstance(options.confidence_threshold,float):
rendering_confidence_threshold = options.confidence_threshold
else:
category_ids = set()
for d in detections:
category_ids.add(d['category'])
rendering_confidence_threshold = {}
for category_id in category_ids:
rendering_confidence_threshold[category_id] = \
_get_threshold_for_category_id(category_id, options, detection_categories)
# Render detection boxes
vis_utils.render_detection_bounding_boxes(
detections, image,
label_map=detection_categories,
classification_label_map=classification_categories,
confidence_threshold=rendering_confidence_threshold,
classification_confidence_threshold=options.classification_confidence_threshold,
thickness=options.line_thickness,
expansion=options.box_expansion)
try:
image.save(fullpath)
except OSError as e:
# errno.ENAMETOOLONG doesn't get thrown properly on Windows, so
# we awkwardly check against a hard-coded limit
if (e.errno == errno.ENAMETOOLONG) or (len(fullpath) >= 259):
extension = os.path.splitext(sample_name)[1]
sample_name = category_string + '_' + str(uuid.uuid4()) + extension
image.save(os.path.join(options.output_dir, category_string, sample_name))
else:
raise
# ...if we successfully opened this image
# Use forward slashes regardless of os
file_name = '{}/{}'.format(category_string,sample_name)
info = {
'filename': file_name,
'title': display_name,
'textStyle':\
'font-family:verdana,arial,calibri;font-size:80%;text-align:left;margin-top:20;margin-bottom:5'
}
# Optionally add links back to the original images
if options.link_images_to_originals and (image_full_path is not None):
# Handling special characters in links has been pushed down into
# write_html_image_list
#
# link_target = image_full_path.replace('\\','/')
# link_target = urllib.parse.quote(link_target)
link_target = image_full_path
info['linkTarget'] = link_target
return info
# ..._render_bounding_boxes(...)
def _prepare_html_subpages(images_html, output_dir, options=None):
"""
Write out a series of html image lists, e.g. the "detections" or "non-detections"
pages.
image_html is a dictionary mapping an html page name (e.g. "detections_animal") to
a list of image structs friendly to write_html_image_list.
Returns a dictionary mapping category names to image counts.
"""
if options is None:
options = PostProcessingOptions()
# Count items in each category
image_counts = {}
for category_string, image_this_category in images_html.items():
image_counts[category_string] = len(image_this_category)
# Optionally sort by filename before writing to html
if options.html_sort_order == 'filename':
images_html_sorted = {}
for category_string, images_this_category in images_html.items():
sorted_images_this_category = sorted(images_this_category, key=lambda x: x['filename'])
images_html_sorted[category_string] = sorted_images_this_category
images_html = images_html_sorted
# Optionally sort by confidence before writing to html
elif options.html_sort_order == 'confidence':
images_html_sorted = {}
for category_string, images_this_category in images_html.items():
if not all(['max_conf' in d for d in images_this_category]):
print(f"Warning: some elements in the {category_string} page don't have confidence " + \
"values, can't sort by confidence")
else:
sorted_images_this_category = sorted(images_this_category,
key=lambda x: x['max_conf'], reverse=True)
images_html_sorted[category_string] = sorted_images_this_category
images_html = images_html_sorted
else:
assert options.html_sort_order == 'random',\
'Unrecognized sort order {}'.format(options.html_sort_order)
images_html_sorted = {}
for category_string, images_this_category in images_html.items():
sorted_images_this_category = random.sample(images_this_category,len(images_this_category))
images_html_sorted[category_string] = sorted_images_this_category
images_html = images_html_sorted
# Write the individual HTML files
#
# category_string will be, e.g., "detections", "non-detections", "class_coyote"
for category_string, images_this_category in images_html.items():
html_image_list_options = {}
html_image_list_options['maxFiguresPerHtmlFile'] = options.max_figures_per_html_file
html_image_list_options['headerHtml'] = '<h1>{}</h1>'.format(category_string.upper())
html_image_list_options['pageTitle'] = '{}'.format(category_string.lower())
# Don't write empty pages
if len(images_this_category) == 0:
continue
else:
html_filename_base = clean_filename(category_string,replace_whitespace='_')
write_html_image_list(
filename=os.path.join(output_dir, '{}.html'.format(html_filename_base)),
images=images_this_category,
options=html_image_list_options)
# ...for each HTML page
return image_counts
# ..._prepare_html_subpages()
def _get_threshold_for_category_name(category_name,options):
"""
Determines the confidence threshold we should use for a specific category name.
"""
if isinstance(options.confidence_threshold,float):
return options.confidence_threshold
else:
assert isinstance(options.confidence_threshold,dict), \
'confidence_threshold must either be a float or a dict'
if category_name in options.confidence_threshold:
return options.confidence_threshold[category_name]
else:
assert 'default' in options.confidence_threshold, \
'category {} not in confidence_threshold dict, and no default supplied'.format(
category_name)
return options.confidence_threshold['default']
def _get_threshold_for_category_id(category_id,options,detection_categories):
"""
Determines the confidence threshold we should use for a specific category ID.
[detection_categories] is a dict mapping category IDs to names.
"""
if isinstance(options.confidence_threshold,float):
return options.confidence_threshold
assert category_id in detection_categories, \
'Invalid category ID {}'.format(category_id)
category_name = detection_categories[category_id]
return _get_threshold_for_category_name(category_name,options)
def _get_positive_categories(detections,options,detection_categories):
"""
Gets a sorted list of unique categories (as string IDs) above the threshold for this image
[detection_categories] is a dict mapping category IDs to names.
"""
positive_categories = set()
for d in detections:
threshold = _get_threshold_for_category_id(d['category'], options, detection_categories)
if d['conf'] >= threshold:
positive_categories.add(d['category'])
return sorted(positive_categories)
def _get_positive_detections(detections,options,detection_categories):
"""
Returns a list of positive detections n the detection list [detections].
"""
positive_detections = []
for d in detections:
threshold = _get_threshold_for_category_id(d['category'], options, detection_categories)
if d['conf'] >= threshold:
positive_detections.append(d)
return positive_detections
def _has_positive_detection(detections,options,detection_categories):
"""
Determines whether any positive detections are present in the detection list
[detections].
"""
found_positive_detection = False
for d in detections:
threshold = _get_threshold_for_category_id(d['category'], options, detection_categories)
if d['conf'] >= threshold:
found_positive_detection = True
break
return found_positive_detection
def _render_image_no_gt(file_info,
detection_categories_to_results_name,
detection_categories,
classification_categories,
options):
r"""
Renders an image (with no ground truth information)
Returns a list of rendering structs, where the first item is a category (e.g. "detections_animal"),
and the second is a dict of information needed for rendering. E.g.:
[['detections_animal',
{
'filename': 'detections_animal/detections_animal_blah~01060415.JPG',
'title': '<b>Result type</b>: detections_animal,
<b>Image</b>: blah/01060415.JPG,
<b>Max conf</b>: 0.897',
'textStyle': 'font-family:verdana,arial,calibri;font-size:80%;text-align:left;margin-top:20;margin-bottom:5',
'linkTarget': 'full_path_to_%5C01060415.JPG'
}]]
When no classification data is present, this list will always be length-1. When
classification data is present, an image may appear in multiple categories.
Populates the 'max_conf' field of the first element of the list.
Returns None if there are any errors.
"""
image_relative_path = file_info['file']
# Useful debug snippet
#
# if 'filename' in image_relative_path:
# import pdb; pdb.set_trace()
max_conf = file_info['max_detection_conf']
detections = file_info['detections']
# Determine whether any positive detections are present (using a threshold that
# may vary by category)
found_positive_detection = \
_has_positive_detection(detections,options,detection_categories)
detection_status = DetectionStatus.DS_UNASSIGNED
if found_positive_detection:
detection_status = DetectionStatus.DS_POSITIVE
else:
if options.include_almost_detections:
if max_conf >= options.almost_detection_confidence_threshold:
detection_status = DetectionStatus.DS_ALMOST
else:
detection_status = DetectionStatus.DS_NEGATIVE
else:
detection_status = DetectionStatus.DS_NEGATIVE
if detection_status == DetectionStatus.DS_POSITIVE:
if options.separate_detections_by_category:
positive_categories = tuple(_get_positive_categories(detections,options,detection_categories))
if positive_categories not in detection_categories_to_results_name:
raise ValueError('Error: {} not in category mapping (file {})'.format(
str(positive_categories),image_relative_path))
category_string = detection_categories_to_results_name[positive_categories]
else:
category_string = 'detections'
elif detection_status == DetectionStatus.DS_NEGATIVE:
category_string = 'non_detections'
else:
assert detection_status == DetectionStatus.DS_ALMOST
category_string = 'almost_detections'
display_name = '<b>Result type</b>: {}, <b>image</b>: {}, <b>max conf</b>: {:0.2f}'.format(
category_string, image_relative_path, max_conf)
if options.include_size_range:
positive_detections = _get_positive_detections(detections,options,detection_categories)
min_size = None
max_size = None
for d in positive_detections:
assert 'bbox' in d
assert len(d['bbox']) >= 4
assert d['bbox'][2] > 0
assert d['bbox'][3] > 0
normalized_size = d['bbox'][2] * d['bbox'][3]
if (min_size is None) or (normalized_size < min_size):
min_size = normalized_size
if (max_size is None) or (normalized_size > max_size):
max_size = normalized_size
# ...for each detection
if min_size is None:
display_name += ' (no size range)'
else:
display_name += ' (size min/max: {:0.4f},{:0.4f})'.format(
min_size,max_size)
# ...if we're supposed to include the detection size range
# Are there any bonus fields we need to include in each image header?
if options.additional_image_fields_to_display is not None:
for field_name in options.additional_image_fields_to_display:
if field_name in file_info:
field_value = file_info[field_name]
if (field_value is None) or \
(isinstance(field_value,float) and np.isnan(field_value)):
continue
# Optionally use a display name that's different from the field name
if isinstance(options.additional_image_fields_to_display,dict):
field_display_name = \
options.additional_image_fields_to_display[field_name]
else:
field_display_name = field_name
field_string = '<b>{}</b>: {}'.format(field_display_name,field_value)
display_name += ', {}'.format(field_string)
rendering_options = copy.copy(options)
if detection_status == DetectionStatus.DS_ALMOST:
rendering_options.confidence_threshold = \
rendering_options.almost_detection_confidence_threshold
rendered_image_html_info = _render_bounding_boxes(
image_base_dir=options.image_base_dir,
image_relative_path=image_relative_path,
display_name=display_name,
detections=detections,
category_string=category_string,
ground_truth_boxes=None,
detection_categories=detection_categories,
classification_categories=classification_categories,
options=rendering_options)
image_result = None
if len(rendered_image_html_info) > 0:
image_result = [[category_string, rendered_image_html_info]]
classes_rendered_this_image = set()
max_conf = 0
for det in detections:
if det['conf'] > max_conf:
max_conf = det['conf']
# We make the decision here that only "detections" (not "almost-detections")
# will appear on the classification category pages
detection_threshold = \
_get_threshold_for_category_id(det['category'], options, detection_categories)
if det['conf'] < detection_threshold:
continue
if ('classifications' in det) and (len(det['classifications']) > 0) and \
(category_string != 'non_detections'):
# This is a list of [class,confidence] pairs, sorted by classification confidence
classifications = det['classifications']
top1_class_id = classifications[0][0]
top1_class_name = classification_categories[top1_class_id]
top1_class_score = classifications[0][1]
# If we either don't have a classification confidence threshold, or
# we've met our classification confidence threshold
if (options.classification_confidence_threshold < 0) or \
(top1_class_score >= options.classification_confidence_threshold):
class_string = 'class_{}'.format(top1_class_name)
else:
class_string = 'class_unreliable'
if class_string not in classes_rendered_this_image:
image_result.append([class_string,
rendered_image_html_info])
classes_rendered_this_image.add(class_string)
# ...if this detection has classification info
# ...for each detection
image_result[0][1]['max_conf'] = max_conf
# ...if we got valid rendering info back from _render_bounding_boxes()
return image_result
# ...def _render_image_no_gt()
def _render_image_with_gt(file_info,
ground_truth_indexed_db,
detection_categories,
classification_categories,
options):
"""
Render an image with ground truth information. See _render_image_no_gt for return
data format.
"""
image_relative_path = file_info['file']
max_conf = file_info['max_detection_conf']
detections = file_info['detections']
# This should already have been normalized to either '/' or '\'
image_id = ground_truth_indexed_db.filename_to_id.get(image_relative_path, None)
if image_id is None:
print('Warning: couldn''t find ground truth for image {}'.format(image_relative_path))
return None
image = ground_truth_indexed_db.image_id_to_image[image_id]
annotations = ground_truth_indexed_db.image_id_to_annotations[image_id]
ground_truth_boxes = []
for ann in annotations:
if 'bbox' in ann:
ground_truth_box = [x for x in ann['bbox']]
ground_truth_box.append(ann['category_id'])
ground_truth_boxes.append(ground_truth_box)
gt_status = image['_detection_status']
gt_presence = bool(gt_status)
gt_classes = CameraTrapJsonUtils.annotations_to_class_names(
annotations, ground_truth_indexed_db.cat_id_to_name)
gt_class_summary = ','.join(gt_classes)
if gt_status > DetectionStatus.DS_MAX_DEFINITIVE_VALUE:
print(f'Skipping image {image_id}, does not have a definitive '
f'ground truth status (status: {gt_status}, classes: {gt_class_summary})')
return None
detected = _has_positive_detection(detections, options, detection_categories)
if gt_presence and detected:
if '_classification_accuracy' not in image.keys():
result_type = 'tp'
elif np.isclose(1, image['_classification_accuracy']):
result_type = 'tpc'
else:
result_type = 'tpi'
elif not gt_presence and detected:
result_type = 'fp'
elif gt_presence and not detected:
result_type = 'fn'
else:
result_type = 'tn'
display_name = '<b>Result type</b>: {}, <b>Presence</b>: {}, <b>Class</b>: {}, <b>Max conf</b>: {:0.3f}%, <b>Image</b>: {}'.format( # noqa
result_type.upper(),
str(gt_presence),
gt_class_summary,
max_conf * 100,
image_relative_path)
rendered_image_html_info = _render_bounding_boxes(
image_base_dir=options.image_base_dir,
image_relative_path=image_relative_path,
display_name=display_name,
detections=detections,
category_string=result_type,
ground_truth_boxes=ground_truth_boxes,
detection_categories=detection_categories,
classification_categories=classification_categories,
options=options)
image_result = None
if len(rendered_image_html_info) > 0:
image_result = [[result_type, rendered_image_html_info]]
for gt_class in gt_classes:
image_result.append(['class_{}'.format(gt_class), rendered_image_html_info])
return image_result
# ...def _render_image_with_gt()
#%% Main function
[docs]
def process_batch_results(options):
"""
Given a .json or .csv file containing MD results, do one or more of the following:
* Sample detections/non-detections and render to HTML (when ground truth isn't
available) (this is 99.9% of what this module is for)
* Evaluate detector precision/recall, optionally rendering results (requires
ground truth)
* Sample true/false positives/negatives and render to HTML (requires ground
truth)
Ground truth, if available, must be in COCO Camera Traps format:
https://github.com/agentmorris/MegaDetector/blob/main/megadetector/data_management/README.md#coco-camera-traps-format
Args:
options (PostProcessingOptions): everything we need to render a preview/analysis for
this set of results; see the PostProcessingOptions class for details.
Returns:
PostProcessingResults: information about the results/preview, most importantly the
HTML filename of the output. See the PostProcessingResults class for details.
"""
ppresults = PostProcessingResults()
##%% Expand some options for convenience
output_dir = options.output_dir
##%% Prepare output dir
os.makedirs(output_dir, exist_ok=True)
##%% Load ground truth if available
ground_truth_indexed_db = None
if (options.ground_truth_json_file is not None) and (len(options.ground_truth_json_file) > 0):
assert (options.confidence_threshold is None) or (isinstance(options.confidence_threshold,float)), \
'Variable confidence thresholds are not supported when supplying ground truth'
if (options.ground_truth_json_file is not None) and (len(options.ground_truth_json_file) > 0):
if options.separate_detections_by_category:
print("Warning: I don't know how to separate categories yet when doing " + \
"a P/R analysis, disabling category separation")
options.separate_detections_by_category = False
ground_truth_indexed_db = IndexedJsonDb(
options.ground_truth_json_file,
b_normalize_paths=True,
filename_replacements=options.ground_truth_filename_replacements)
# Mark images in the ground truth as positive or negative
n_negative, n_positive, n_unknown, n_ambiguous = _mark_detection_status(
ground_truth_indexed_db,
negative_classes=options.negative_classes,
unknown_classes=options.unlabeled_classes)
print(f'Finished loading and indexing ground truth: {n_negative} '
f'negative, {n_positive} positive, {n_unknown} unknown, '
f'{n_ambiguous} ambiguous')
if n_positive == 0:
print('\n*** Warning: no positives found in ground truth, analysis won\'t be very meaningful ***\n')
if n_negative == 0:
print('\n*** Warning: no negatives found in ground truth, analysis won\'t be very meaningful ***\n')
if n_ambiguous > 0:
print('\n*** Warning: {} images with ambiguous positive/negative status found in ground truth ***\n'.format(
n_ambiguous))
##%% Load detection (and possibly classification) results
# If the caller hasn't supplied results, load them
if options.api_detection_results is None:
detections_df, other_fields = load_api_results(
options.md_results_file, force_forward_slashes=True,
filename_replacements=options.api_output_filename_replacements)
ppresults.api_detection_results = detections_df
ppresults.api_other_fields = other_fields
else:
print('Bypassing detection results loading...')
assert options.api_other_fields is not None
detections_df = options.api_detection_results
other_fields = options.api_other_fields
# Determine confidence thresholds if necessary
if options.confidence_threshold is None:
options.confidence_threshold = \
get_typical_confidence_threshold_from_results(other_fields)
print('Choosing default confidence threshold of {} based on MD version'.format(
options.confidence_threshold))
if options.almost_detection_confidence_threshold is None and options.include_almost_detections:
assert isinstance(options.confidence_threshold,float), \
'If you are using a dictionary of confidence thresholds and almost-detections are enabled, ' + \
'you need to supply a threshold for almost detections.'
options.almost_detection_confidence_threshold = options.confidence_threshold - 0.05
if options.almost_detection_confidence_threshold < 0:
options.almost_detection_confidence_threshold = 0
# Remove rows with inference failures (typically due to corrupt images)
n_failures = 0
if 'failure' in detections_df.columns:
n_failures = detections_df['failure'].count()
print('Ignoring {} failed images'.format(n_failures))
# Explicitly forcing a copy() operation here to suppress "trying to be set
# on a copy" warnings (and associated risks) below.
detections_df = detections_df[detections_df['failure'].isna()].copy()
assert other_fields is not None
detection_categories = other_fields['detection_categories']
# Convert keys and values to lowercase
classification_categories = other_fields.get('classification_categories', {})
if classification_categories is not None:
classification_categories = {
k.lower(): v.lower()
for k, v in classification_categories.items()
}
# Count detections and almost-detections for reporting purposes
n_positives = 0
n_almosts = 0
print('Assigning images to rendering categories')
for i_row,row in tqdm(detections_df.iterrows(),total=len(detections_df)):
detections = row['detections']
max_conf = row['max_detection_conf']
if _has_positive_detection(detections, options, detection_categories):
n_positives += 1
elif (options.almost_detection_confidence_threshold is not None) and \
(max_conf >= options.almost_detection_confidence_threshold):
n_almosts += 1
print(f'Finished loading and preprocessing {len(detections_df)} rows '
f'from detector output, predicted {n_positives} positives.')
if options.include_almost_detections:
print('...and {} almost-positives'.format(n_almosts))
##%% Find descriptive metadata to include at the top of the page
if options.job_name_string is not None:
job_name_string = options.job_name_string
else:
# This is rare; it only happens during debugging when the caller
# is supplying already-loaded MD results.
if options.md_results_file is None:
job_name_string = 'unknown'
else:
job_name_string = os.path.basename(options.md_results_file)
if options.model_version_string is not None:
model_version_string = options.model_version_string
else:
if 'info' not in other_fields or 'detector' not in other_fields['info']:
print('No model metadata supplied, assuming MDv4')
model_version_string = 'MDv4 (assumed)'
else:
model_version_string = other_fields['info']['detector']
##%% If we have ground truth, remove images we can't match to ground truth
if ground_truth_indexed_db is not None:
b_match = detections_df['file'].isin(
ground_truth_indexed_db.filename_to_id)
print(f'Confirmed filename matches to ground truth for {sum(b_match)} '
f'of {len(detections_df)} files')
detections_df = detections_df[b_match]
detector_files = detections_df['file'].tolist()
assert len(detector_files) > 0, (
'No detection files available, possible path issue?')
print('Trimmed detection results to {} files'.format(len(detector_files)))
##%% (Optionally) sample from the full set of images
images_to_visualize = detections_df
if (options.num_images_to_sample is not None) and (options.num_images_to_sample > 0):
images_to_visualize = images_to_visualize.sample(
n=min(options.num_images_to_sample, len(images_to_visualize)),
random_state=options.sample_seed)
output_html_file = ''
style_header = """<head>
<title>Detection results preview</title>
<style type="text/css">
a { text-decoration: none; }
body { font-family: segoe ui, calibri, "trebuchet ms", verdana, arial, sans-serif; }
div.contentdiv { margin-left: 20px; }
</style>
</head>"""
##%% Fork here depending on whether or not ground truth is available
# If we have ground truth, we'll compute precision/recall and sample tp/fp/tn/fn.
#
# Otherwise we'll just visualize detections/non-detections.
if ground_truth_indexed_db is not None:
##%% Detection evaluation: compute precision/recall
# numpy array of maximum confidence values
p_detection = detections_df['max_detection_conf'].values
n_detection_values = len(p_detection)
# numpy array of bools (0.0/1.0), and -1 as null value
gt_detections = np.zeros(n_detection_values, dtype=float)
n_positive = 0
n_negative = 0
for i_detection, fn in enumerate(detector_files):
image_id = ground_truth_indexed_db.filename_to_id[fn]
image = ground_truth_indexed_db.image_id_to_image[image_id]
detection_status = image['_detection_status']
if detection_status == DetectionStatus.DS_NEGATIVE:
gt_detections[i_detection] = 0.0
n_negative += 1
elif detection_status == DetectionStatus.DS_POSITIVE:
gt_detections[i_detection] = 1.0
n_positive += 1
else:
gt_detections[i_detection] = -1.0
print('Of {} ground truth values, found {} positives and {} negatives'.format(
len(detections_df),n_positive,n_negative))
# Don't include ambiguous/unknown ground truth in precision/recall analysis
b_valid_ground_truth = gt_detections >= 0.0
p_detection_pr = p_detection[b_valid_ground_truth]
gt_detections_pr = (gt_detections[b_valid_ground_truth] == 1.)
print('Including {} of {} values in p/r analysis'.format(np.sum(b_valid_ground_truth),
len(b_valid_ground_truth)))
precisions, recalls, thresholds = precision_recall_curve(gt_detections_pr, p_detection_pr)
# For completeness, include the result at a confidence threshold of 1.0
thresholds = np.append(thresholds, [1.0])
precisions_recalls = pd.DataFrame(data={
'confidence_threshold': thresholds,
'precision': precisions,
'recall': recalls
})
# Compute and print summary statistics
average_precision = average_precision_score(gt_detections_pr, p_detection_pr)
print('Average precision: {:.1%}'.format(average_precision))
# Thresholds go up throughout precisions/recalls/thresholds; find the last
# value where recall is at or above target. That's our precision @ target recall.
i_above_target_recall = (np.where(recalls >= options.target_recall))
# np.where returns a tuple of arrays, but in this syntax where we're
# comparing an array with a scalar, there will only be one element.
assert len (i_above_target_recall) == 1
# Convert back to a list
i_above_target_recall = i_above_target_recall[0].tolist()
if len(i_above_target_recall) == 0:
precision_at_target_recall = 0.0
else:
precision_at_target_recall = precisions[i_above_target_recall[-1]]
print('Precision at {:.1%} recall: {:.1%}'.format(options.target_recall,
precision_at_target_recall))
cm_predictions = np.array(p_detection_pr) > options.confidence_threshold
cm = confusion_matrix(gt_detections_pr, cm_predictions, labels=[False,True])
# Flatten the confusion matrix
tn, fp, fn, tp = cm.ravel()
precision_at_confidence_threshold = tp / (tp + fp)
recall_at_confidence_threshold = tp / (tp + fn)
f1 = 2.0 * (precision_at_confidence_threshold * recall_at_confidence_threshold) / \
(precision_at_confidence_threshold + recall_at_confidence_threshold)
print('At a confidence threshold of {:.1%}, precision={:.1%}, recall={:.1%}, f1={:.1%}'.format(
options.confidence_threshold, precision_at_confidence_threshold,
recall_at_confidence_threshold, f1))
##%% Collect classification results, if they exist
classifier_accuracies = []
# Mapping of classnames to idx for the confusion matrix.
#
# The lambda is actually kind of a hack, because we use assume that
# the following code does not reassign classname_to_idx
classname_to_idx = collections.defaultdict(lambda: len(classname_to_idx))
# Confusion matrix as defaultdict of defaultdict
#
# Rows / first index is ground truth, columns / second index is predicted category
classifier_cm = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
# i_detection = 0; fn = detector_files[i_detection]; print(fn)
assert len(detector_files) == len(detections_df)
for i_detection, fn in enumerate(detector_files):
image_id = ground_truth_indexed_db.filename_to_id[fn]
image = ground_truth_indexed_db.image_id_to_image[image_id]
detections = detections_df['detections'].iloc[i_detection]
pred_class_ids = [det['classifications'][0][0] \
for det in detections if 'classifications' in det.keys()]
pred_classnames = [classification_categories[pd] for pd in pred_class_ids]
# If this image has classification predictions, and an unambiguous class
# annotated, and is a positive image...
if len(pred_classnames) > 0 \
and '_unambiguous_category' in image.keys() \
and image['_detection_status'] == DetectionStatus.DS_POSITIVE:
# The unambiguous category, we make this a set for easier handling afterward
gt_categories = set([image['_unambiguous_category']])
pred_categories = set(pred_classnames)
# Compute the accuracy as intersection of union,
# i.e. (# of categories in both prediction and GT)
# divided by (# of categories in either prediction or GT
#
# In case of only one GT category, the result will be 1.0, if
# prediction is one category and this category matches GT
#
# It is 1.0/(# of predicted top-1 categories), if the GT is
# one of the predicted top-1 categories.
#
# It is 0.0, if none of the predicted categories is correct
classifier_accuracies.append(
len(gt_categories & pred_categories)
/ len(gt_categories | pred_categories)
)
image['_classification_accuracy'] = classifier_accuracies[-1]
# Distribute this accuracy across all predicted categories in the
# confusion matrix
assert len(gt_categories) == 1
gt_class_idx = classname_to_idx[list(gt_categories)[0]]
for pred_category in pred_categories:
pred_class_idx = classname_to_idx[pred_category]
classifier_cm[gt_class_idx][pred_class_idx] += 1
# ...for each file in the detection results
# If we have classification results
if len(classifier_accuracies) > 0:
# Build confusion matrix as array from classifier_cm
all_class_ids = sorted(classname_to_idx.values())
classifier_cm_array = np.array(
[[classifier_cm[r_idx][c_idx] for c_idx in all_class_ids] for \
r_idx in all_class_ids], dtype=float)
classifier_cm_array /= (classifier_cm_array.sum(axis=1, keepdims=True) + 1e-7)
# Print some statistics
print('Finished computation of {} classification results'.format(
len(classifier_accuracies)))
print('Mean accuracy: {}'.format(np.mean(classifier_accuracies)))
# Prepare confusion matrix output
# Get confusion matrix as string
sio = io.StringIO()
np.savetxt(sio, classifier_cm_array * 100, fmt='%5.1f')
cm_str = sio.getvalue()
# Get fixed-size classname for each idx
idx_to_classname = {v:k for k,v in classname_to_idx.items()}
classname_list = [idx_to_classname[idx] for idx in sorted(classname_to_idx.values())]
classname_headers = ['{:<5}'.format(cname[:5]) for cname in classname_list]
# Prepend class name on each line and add to the top
cm_str_lines = [' ' * 16 + ' '.join(classname_headers)]
cm_str_lines += ['{:>15}'.format(cn[:15]) + ' ' + cm_line for cn, cm_line in \
zip(classname_list, cm_str.splitlines(), strict=True)]
# Print formatted confusion matrix
if False:
# Actually don't, this gets really messy in all but the widest consoles
print('Confusion matrix: ')
print(*cm_str_lines, sep='\n')
# Plot confusion matrix
# To manually add more space at bottom: plt.rcParams['figure.subplot.bottom'] = 0.1
#
# Add 0.5 to figsize for every class. For two classes, this will result in
# fig = plt.figure(figsize=[4,4])
fig = plot_utils.plot_confusion_matrix(
classifier_cm_array,
classname_list,
normalize=False,
title='Confusion matrix',
cmap=plt.cm.Blues,
vmax=1.0,
use_colorbar=True,
y_label=True)
cm_figure_relative_filename = 'confusion_matrix.png'
cm_figure_filename = os.path.join(output_dir, cm_figure_relative_filename)
plt.savefig(cm_figure_filename)
plt.close(fig)
# ...if we have classification results
##%% Render output
# Write p/r table to .csv file in output directory
pr_table_filename = os.path.join(output_dir, 'prec_recall.csv')
precisions_recalls.to_csv(pr_table_filename, index=False)
# Write precision/recall plot to .png file in output directory
t = 'Precision-Recall curve: AP={:0.1%}, P@{:0.1%}={:0.1%}'.format(
average_precision, options.target_recall, precision_at_target_recall)
fig = plot_utils.plot_precision_recall_curve(precisions, recalls, t)
pr_figure_relative_filename = 'prec_recall.png'
pr_figure_filename = os.path.join(output_dir, pr_figure_relative_filename)
fig.savefig(pr_figure_filename)
plt.close(fig)
##%% Sampling
# Sample true/false positives/negatives with correct/incorrect top-1
# classification and render to html
# Accumulate html image structs (in the format expected by write_html_image_list)
# for each category, e.g. 'tp', 'fp', ..., 'class_bird', ...
images_html = collections.defaultdict(list)
# Add default entries by accessing them for the first time
[images_html[res] for res in ['tp', 'tpc', 'tpi', 'fp', 'tn', 'fn']]
for res in images_html.keys():
os.makedirs(os.path.join(output_dir, res), exist_ok=True)
image_count = len(images_to_visualize)
# Each element will be a list of 2-tuples, with elements [collection name,html info struct]
rendering_results = []
# Each element will be a three-tuple with elements file,max_conf,detections
files_to_render = []
# Assemble the information we need for rendering, so we can parallelize without
# dealing with Pandas
# i_row = 0; row = images_to_visualize.iloc[0]
for _, row in images_to_visualize.iterrows():
# Filenames should already have been normalized to either '/' or '\'
files_to_render.append(row.to_dict())
start_time = time.time()
if options.parallelize_rendering:
pool = None
try:
if options.parallelize_rendering_n_cores is None:
if options.parallelize_rendering_with_threads:
pool = ThreadPool()
else:
pool = Pool()
else:
if options.parallelize_rendering_with_threads:
pool = ThreadPool(options.parallelize_rendering_n_cores)
worker_string = 'threads'
else:
pool = Pool(options.parallelize_rendering_n_cores)
worker_string = 'processes'
print('Rendering images with {} {}'.format(options.parallelize_rendering_n_cores,
worker_string))
rendering_results = list(tqdm(pool.imap(
partial(_render_image_with_gt,
ground_truth_indexed_db=ground_truth_indexed_db,
detection_categories=detection_categories,
classification_categories=classification_categories,
options=options),
files_to_render), total=len(files_to_render)))
finally:
if pool is not None:
pool.close()
pool.join()
print('Pool closed and joined for GT rendering')
else:
for file_info in tqdm(files_to_render):
rendering_results.append(_render_image_with_gt(
file_info,ground_truth_indexed_db,
detection_categories,classification_categories,
options=options))
elapsed = time.time() - start_time
# Map all the rendering results in the list rendering_results into the
# dictionary images_html, which maps category names to lists of results
image_rendered_count = 0
for rendering_result in rendering_results:
if rendering_result is None:
continue
image_rendered_count += 1
for assignment in rendering_result:
images_html[assignment[0]].append(assignment[1])
# Prepare the individual html image files
image_counts = _prepare_html_subpages(images_html, output_dir, options)
print('{} images rendered (of {})'.format(image_rendered_count,image_count))
# Write index.html
all_tp_count = image_counts['tp'] + image_counts['tpc'] + image_counts['tpi']
total_count = all_tp_count + image_counts['tn'] + image_counts['fp'] + image_counts['fn']
classification_detection_results = """ <a href="tpc.html">with all correct top-1 predictions (TPC)</a> ({})<br/>
<a href="tpi.html">with one or more incorrect top-1 prediction (TPI)</a> ({})<br/>
<a href="tp.html">without classification evaluation</a><sup>*</sup> ({})<br/>""".format(
image_counts['tpc'],
image_counts['tpi'],
image_counts['tp']
)
confidence_threshold_string = ''
if isinstance(options.confidence_threshold,float):
confidence_threshold_string = '{:.2%}'.format(options.confidence_threshold)
else:
confidence_threshold_string = str(options.confidence_threshold)
index_page = """<html>
{}
<body>
<h2>Evaluation</h2>
<h3>Job metadata</h3>
<div class="contentdiv">
<p>Job name: {}<br/>
<p>Model version: {}</p>
</div>
<h3>Sample images</h3>
<div class="contentdiv">
<p>A sample of {} images, annotated with detections above confidence {}.</p>
<a href="tp.html">True positives (TP)</a> ({}) ({:0.1%})<br/>
CLASSIFICATION_PLACEHOLDER_1
<a href="tn.html">True negatives (TN)</a> ({}) ({:0.1%})<br/>
<a href="fp.html">False positives (FP)</a> ({}) ({:0.1%})<br/>
<a href="fn.html">False negatives (FN)</a> ({}) ({:0.1%})<br/>
CLASSIFICATION_PLACEHOLDER_2
</div>
""".format(
style_header,job_name_string,model_version_string,
image_count, confidence_threshold_string,
all_tp_count, all_tp_count/total_count,
image_counts['tn'], image_counts['tn']/total_count,
image_counts['fp'], image_counts['fp']/total_count,
image_counts['fn'], image_counts['fn']/total_count
)
index_page += """
<h3>Detection results</h3>
<div class="contentdiv">
<p>At a confidence threshold of {}, precision={:0.1%}, recall={:0.1%}</p>
<p><strong>Precision/recall summary for all {} images</strong></p><img src="{}"><br/>
</div>
""".format(
confidence_threshold_string, precision_at_confidence_threshold, recall_at_confidence_threshold,
len(detections_df), pr_figure_relative_filename
)
if len(classifier_accuracies) > 0:
index_page = index_page.replace('CLASSIFICATION_PLACEHOLDER_1',classification_detection_results)
index_page = index_page.replace('CLASSIFICATION_PLACEHOLDER_2',"""<p><sup>*</sup>We do not evaluate the classification result of images
if the classification information is missing, if the image contains
categories like ‘empty’ or ‘human’, or if the image has multiple
classification labels.</p>""")
else:
index_page = index_page.replace('CLASSIFICATION_PLACEHOLDER_1','')
index_page = index_page.replace('CLASSIFICATION_PLACEHOLDER_2','')
if len(classifier_accuracies) > 0:
index_page += """
<h3>Classification results</h3>
<div class="contentdiv">
<p>Classification accuracy: {:.2%}<br>
The accuracy is computed only for images with exactly one classification label.
The accuracy of an image is computed as 1/(number of unique detected top-1 classes),
i.e. if the model detects multiple boxes with different top-1 classes, then the accuracy
decreases and the image is put into 'TPI'.</p>
<p>Confusion matrix:</p>
<p><img src="{}"></p>
<div style='font-family:monospace;display:block;'>{}</div>
</div>
""".format(
np.mean(classifier_accuracies),
cm_figure_relative_filename,
"<br>".join(cm_str_lines).replace(' ', ' ')
)
# Show links to each GT class
#
# We could do this without classification results; currently we don't.
if len(classname_to_idx) > 0:
index_page += '<h3>Images of specific classes</h3><br/><div class="contentdiv">'
# Add links to all available classes
for cname in sorted(classname_to_idx.keys()):
cname_print_string = cname
if len(cname_print_string) == 0:
cname_print_string = '[no name available]'
index_page += '<a href="class_{0}.html">{0}</a> ({1})<br>'.format(
cname_print_string,
len(images_html['class_{}'.format(cname)]))
index_page += '</div>'
# Write custom footer if it was provided
if (options.footer_text is not None) and (len(options.footer_text) > 0):
index_page += '{}\n'.format(options.footer_text)
# Close open html tags
index_page += '\n</body></html>\n'
output_html_file = os.path.join(output_dir, 'index.html')
with open(output_html_file, 'w',
encoding=options.output_html_encoding) as f:
f.write(index_page)
print('Finished writing html to {}'.format(output_html_file))
# ...if we have ground truth
##%% Otherwise, if we don't have ground truth...
else:
##%% Sample detections/non-detections
# Accumulate html image structs (in the format expected by write_html_image_list)
# for each category
images_html = collections.defaultdict(list)
# Add default entries by accessing them for the first time
# Maps sorted tuples of detection category IDs (string ints) - e.g. ("1"), ("1", "4", "7") - to
# result set names, e.g. "detections_human", "detections_cat_truck".
detection_categories_to_results_name = {}
# Keep track of which categories are single-class (e.g. "animal") and which are
# combinations (e.g. "animal_vehicle")
detection_categories_to_category_count = {}
# For the creation of a "non-detections" category
images_html['non_detections']
detection_categories_to_category_count['non_detections'] = 0
if not options.separate_detections_by_category:
# For the creation of a "detections" category
images_html['detections']
detection_categories_to_category_count['detections'] = 0
else:
# Add a set of results for each category and combination of categories, e.g.
# "detections_animal_vehicle". When we're using this script for non-MegaDetector
# results, this can generate lots of categories, e.g. detections_bear_bird_cat_dog_pig.
# We'll keep that huge set of combinations in this map, but we'll only write
# out links for the ones that are non-empty.
used_combinations = set()
# row = images_to_visualize.iloc[0]
for i_row, row in images_to_visualize.iterrows():
detections_this_row = row['detections']
above_threshold_category_ids_this_row = set()
for detection in detections_this_row:
threshold = _get_threshold_for_category_id(detection['category'], options, detection_categories)
if detection['conf'] >= threshold:
above_threshold_category_ids_this_row.add(detection['category'])
if len(above_threshold_category_ids_this_row) == 0:
continue
sorted_categories_this_row = tuple(sorted(above_threshold_category_ids_this_row))
used_combinations.add(sorted_categories_this_row)
for sorted_subset in used_combinations:
assert len(sorted_subset) > 0
results_name = 'detections'
for category_id in sorted_subset:
results_name = results_name + '_' + detection_categories[category_id]
images_html[results_name]
detection_categories_to_results_name[sorted_subset] = results_name
detection_categories_to_category_count[results_name] = len(sorted_subset)
if options.include_almost_detections:
images_html['almost_detections']
detection_categories_to_category_count['almost_detections'] = 0
# Create output directories
for res in images_html.keys():
os.makedirs(os.path.join(output_dir, res), exist_ok=True)
image_count = len(images_to_visualize)
# Each element will be a list of 2-tuples, with elements [collection name,html info struct]
rendering_results = []
# list of 3-tuples with elements (file, max_conf, detections)
files_to_render = []
# Assemble the information we need for rendering, so we can parallelize without
# dealing with Pandas
# i_row = 0; row = images_to_visualize.iloc[0]
for _, row in images_to_visualize.iterrows():
assert isinstance(row['detections'],list)
# Filenames should already have been normalized to either '/' or '\'
files_to_render.append(row.to_dict())
start_time = time.time()
if options.parallelize_rendering:
pool = None
try:
if options.parallelize_rendering_n_cores is None:
if options.parallelize_rendering_with_threads:
pool = ThreadPool()
else:
pool = Pool()
else:
if options.parallelize_rendering_with_threads:
pool = ThreadPool(options.parallelize_rendering_n_cores)
worker_string = 'threads'
else:
pool = Pool(options.parallelize_rendering_n_cores)
worker_string = 'processes'
print('Rendering images with {} {}'.format(options.parallelize_rendering_n_cores,
worker_string))
# _render_image_no_gt(file_info,detection_categories_to_results_name,
# detection_categories,classification_categories)
rendering_results = list(tqdm(pool.imap(
partial(_render_image_no_gt,
detection_categories_to_results_name=detection_categories_to_results_name,
detection_categories=detection_categories,
classification_categories=classification_categories,
options=options),
files_to_render), total=len(files_to_render)))
finally:
if pool is not None:
pool.close()
pool.join()
print('Pool closed and joined for non-GT rendering')
else:
for file_info in tqdm(files_to_render):
rendering_result = _render_image_no_gt(file_info,
detection_categories_to_results_name,
detection_categories,
classification_categories,
options=options)
rendering_results.append(rendering_result)
elapsed = time.time() - start_time
# Do we have classification results in addition to detection results?
has_classification_info = False
# Map all the rendering results in the list rendering_results into the
# dictionary images_html
image_rendered_count = 0
for rendering_result in rendering_results:
if rendering_result is None:
continue
image_rendered_count += 1
for assignment in rendering_result:
if 'class' in assignment[0]:
has_classification_info = True
images_html[assignment[0]].append(assignment[1])
# Prepare the individual html image files
image_counts = _prepare_html_subpages(images_html, output_dir, options)
if image_rendered_count == 0:
seconds_per_image = 0.0
else:
seconds_per_image = elapsed/image_rendered_count
print('Rendered {} images (of {}) in {} ({} per image)'.format(image_rendered_count,
image_count,humanfriendly.format_timespan(elapsed),
humanfriendly.format_timespan(seconds_per_image)))
# Write index.html
# We can't just sum these, because image_counts includes images in both their
# detection and classification classes
total_images = 0
for k in image_counts.keys():
v = image_counts[k]
if has_classification_info and k.startswith('class_'):
continue
total_images += v
if total_images != image_count:
print('Warning, missing images: image_count is {}, total_images is {}'.format(total_images,image_count))
almost_detection_string = ''
if options.include_almost_detections:
almost_detection_string = ' (“almost detection” threshold at {:.1%})'.format(
options.almost_detection_confidence_threshold)
confidence_threshold_string = ''
if isinstance(options.confidence_threshold,float):
confidence_threshold_string = '{:.2%}'.format(options.confidence_threshold)
else:
confidence_threshold_string = str(options.confidence_threshold)
index_page = """<html>\n{}\n<body>\n
<h2>Visualization of results for {}</h2>\n
<p>A sample of {} images (of {} total)FAILURE_PLACEHOLDER, annotated with detections above confidence {}{}.</p>\n
<div class="contentdiv">
<p>Model version: {}</p>
</div>
<h3>Detection results</h3>\n
<div class="contentdiv">\n""".format(
style_header, job_name_string, image_count, len(detections_df), confidence_threshold_string,
almost_detection_string, model_version_string)
failure_string = ''
if n_failures is not None:
failure_string = ' ({} failures)'.format(n_failures)
index_page = index_page.replace('FAILURE_PLACEHOLDER',failure_string)
def result_set_name_to_friendly_name(result_set_name):
friendly_name = ''
friendly_name = result_set_name.replace('_','-')
if friendly_name.startswith('detections-'):
friendly_name = friendly_name.replace('detections-', 'detections: ')
friendly_name = friendly_name.capitalize()
return friendly_name
sorted_result_set_names = sorted(list(images_html.keys()))
result_set_name_to_count = {}
for result_set_name in sorted_result_set_names:
image_count = image_counts[result_set_name]
result_set_name_to_count[result_set_name] = image_count
sorted_result_set_names = sorted(sorted_result_set_names,
key=lambda x: result_set_name_to_count[x],
reverse=True)
for result_set_name in sorted_result_set_names:
# Don't print classification classes here; we'll do that later with a slightly
# different structure
if has_classification_info and result_set_name.lower().startswith('class_'):
continue
filename = result_set_name + '.html'
label = result_set_name_to_friendly_name(result_set_name)
image_count = image_counts[result_set_name]
# Don't include line items for empty multi-category pages
if image_count == 0 and \
detection_categories_to_category_count[result_set_name] > 1:
continue
if total_images == 0:
image_fraction = -1
else:
image_fraction = image_count / total_images
# Write the line item for this category, including a link only if the
# category is non-empty
if image_count == 0:
index_page += '{} ({}, {:.1%})<br/>\n'.format(
label,image_count,image_fraction)
else:
index_page += '<a href="{}">{}</a> ({}, {:.1%})<br/>\n'.format(
filename,label,image_count,image_fraction)
# ...for each result set
index_page += '</div>\n'
# If classification information is present and we're supposed to create
# a summary of classifications, we'll put it here
category_count_footer = None
if has_classification_info:
index_page += '<h3>Species classification results</h3>'
index_page += '<p>The same image might appear under multiple classes ' + \
'if multiple species were detected.</p>\n'
index_page += '<p>Classifications with confidence less than {:.1%} confidence are considered "unreliable".</p>\n'.format(
options.classification_confidence_threshold)
index_page += '<div class="contentdiv">\n'
# Add links to all available classes
class_names = sorted(classification_categories.values())
if 'class_unreliable' in images_html.keys():
class_names.append('unreliable')
if options.sort_classification_results_by_count:
class_name_to_count = {}
for cname in class_names:
ccount = len(images_html['class_{}'.format(cname)])
class_name_to_count[cname] = ccount
class_names = sorted(class_names,key=lambda x: class_name_to_count[x],reverse=True)
if options.category_name_to_sort_weight is not None:
category_name_to_sort_weight = {}
else:
category_name_to_sort_weight = options.category_name_to_sort_weight
for category_name in category_name_to_sort_weight:
assert isinstance(category_name_to_sort_weight[category_name],int), \
'Illegal sort weight {} for category {}'.format(
category_name_to_sort_weight[category_name],category_name)
# Figure out whether we need to do any grouping of categories
# while we print results
sort_weight_to_class_names = defaultdict(list)
# Loop over class names in the already-sorted order, which will be
# preserved within each weight group
for class_name in class_names:
if class_name in options.category_name_to_sort_weight:
weight = options.category_name_to_sort_weight[class_name]
sort_weight_to_class_names[weight].append(class_name)
else:
# The default weight is zero
sort_weight_to_class_names[0].append(class_name)
sort_weight_to_class_names = sort_dictionary_by_key(sort_weight_to_class_names)
category_names_printed = set()
for i_weight,sort_weight in enumerate(sort_weight_to_class_names):
class_names_this_weight = sort_weight_to_class_names[sort_weight]
# cname = class_names_this_weight[0]
for cname in class_names_this_weight:
# Don't print multiple links when multiple category IDs have the same name
if cname in category_names_printed:
continue
category_names_printed.add(cname)
ccount = len(images_html['class_{}'.format(cname)])
if ccount > 0:
html_filename_base = clean_filename('class_{}'.format(cname),
replace_whitespace='_')
cname_print_string = cname.lower()
if len(cname_print_string) == 0:
cname_print_string = '[no name available]'
index_page += '<a href="{}.html">{}</a> ({})<br/>\n'.format(
html_filename_base, cname_print_string, ccount)
# ...for every category in this sort weight group
# Print a line break between sort weight groups
if i_weight != (len(sort_weight_to_class_names)-1):
index_page += '<br/>\n'
# ...for every sort weight group
index_page += '</div>\n'
if options.include_classification_category_report:
# TODO: it's only for silly historical reasons that we re-read
# the input file in this case; because this module has used Pandas
# forever, we're not currently carrying the json representation around,
# only the Pandas representation.
print('Generating classification category report')
d = load_md_or_speciesnet_file(options.md_results_file)
category_name_to_count = {}
include_category_descriptions_with_global_counts = \
options.include_category_descriptions_with_global_counts
if 'classification_category_descriptions' not in d:
include_category_descriptions_with_global_counts = False
else:
category_id_to_name = d['classification_categories']
category_name_to_id = invert_dictionary(category_id_to_name)
category_id_to_description = d['classification_category_descriptions']
# im = d['images'][0]
for im in d['images']:
if 'detections' in im and im['detections'] is not None:
for det in im['detections']:
if ('classifications' in det) and (len(det['classifications']) > 0):
class_id = det['classifications'][0][0]
class_conf = det['classifications'][0][1]
if class_conf < options.classification_confidence_threshold:
continue
category_name = d['classification_categories'][class_id]
if category_name not in category_name_to_count:
category_name_to_count[category_name] = 1
else:
category_name_to_count[category_name] = \
category_name_to_count[category_name] + 1
# ...for each detection
# ...if this image has detections
# ...for each image
category_name_to_count = sort_dictionary_by_value(
category_name_to_count,reverse=True)
category_count_footer = ''
category_count_footer += '<br/>\n'
category_count_footer += \
'<h3>Category counts (for the whole dataset, not just the sample used for this page)</h3>\n'
category_count_footer += '<div class="contentdiv">\n'
for category_name in category_name_to_count.keys():
count = category_name_to_count[category_name]
category_count_html = '{}: {}'.format(category_name,count)
if include_category_descriptions_with_global_counts:
category_id = category_name_to_id[category_name]
category_description = category_id_to_description[category_id]
category_count_html += ' ({})'.format(category_description)
category_count_html += '<br/>\n'
category_count_footer += category_count_html
category_count_footer += '</div>\n'
# ...if we're generating a classification category report
# ...if classification info is present
if category_count_footer is not None:
index_page += category_count_footer + '\n'
# Write custom footer if it was provided
if (options.footer_text is not None) and (len(options.footer_text) > 0):
index_page += options.footer_text + '\n'
# Close open html tags
index_page += '\n</body></html>\n'
output_html_file = os.path.join(output_dir, 'index.html')
with open(output_html_file, 'w',
encoding=options.output_html_encoding) as f:
f.write(index_page)
print('Finished writing html to {}'.format(output_html_file))
# ...if we do/don't have ground truth
ppresults.output_html_file = output_html_file
return ppresults
# ...process_batch_results
#%% Interactive driver(s)
if False:
#%%
base_dir = r'g:\temp'
options = PostProcessingOptions()
options.image_base_dir = base_dir
options.output_dir = os.path.join(base_dir, 'preview')
options.md_results_file = os.path.join(base_dir, 'results.json')
options.confidence_threshold = {'person':0.5,'animal':0.5,'vehicle':0.01}
options.include_almost_detections = True
options.almost_detection_confidence_threshold = 0.001
ppresults = process_batch_results(options)
# from megadetector.utils.path_utils import open_file; open_file(ppresults.output_html_file)
#%% Command-line driver
def main(): # noqa
options = PostProcessingOptions()
parser = argparse.ArgumentParser()
parser.add_argument(
'md_results_file',
help='path to .json file containing MegaDetector results')
parser.add_argument(
'output_dir',
help='base directory for output')
parser.add_argument(
'--image_base_dir', default=options.image_base_dir,
help='base directory for images (optional, can compute statistics '
'without images)')
parser.add_argument(
'--ground_truth_json_file', default=options.ground_truth_json_file,
help='ground truth labels (optional, can render detections without '
'ground truth), in the COCO Camera Traps format')
parser.add_argument(
'--confidence_threshold', type=float,
default=options.confidence_threshold,
help='Confidence threshold for statistics and visualization')
parser.add_argument(
'--almost_detection_confidence_threshold', type=float,
default=options.almost_detection_confidence_threshold,
help='Almost-detection confidence threshold for statistics and visualization')
parser.add_argument(
'--target_recall', type=float, default=options.target_recall,
help='Target recall (for statistics only)')
parser.add_argument(
'--num_images_to_sample', type=int,
default=options.num_images_to_sample,
help='number of images to visualize, -1 for all images (default: 500)')
parser.add_argument(
'--viz_target_width', type=int, default=options.viz_target_width,
help='Output image width')
parser.add_argument(
'--include_almost_detections', action='store_true',
help='Include a separate category for images just above a second confidence threshold')
parser.add_argument(
'--html_sort_order', type=str, default='filename',
help='Sort order for output pages, should be one of [filename,confidence,random] (defaults to filename)')
parser.add_argument(
'--sort_by_confidence', action='store_true',
help='Sort output in decreasing order by confidence (defaults to sorting by filename)')
parser.add_argument(
'--n_cores', type=int, default=1,
help='Number of threads to use for rendering (default: 1)')
parser.add_argument(
'--parallelize_rendering_with_processes',
action='store_true',
help='Should we use processes (instead of threads) for parallelization?')
parser.add_argument(
'--no_separate_detections_by_category',
action='store_true',
help='Collapse all categories into just "detections" and "non-detections"')
parser.add_argument(
'--open_output_file',
action='store_true',
help='Open the HTML output file when finished')
parser.add_argument(
'--max_figures_per_html_file',
type=int, default=None,
help='Maximum number of images to put on a single HTML page')
if len(sys.argv[1:]) == 0:
parser.print_help()
parser.exit()
args = parser.parse_args()
if args.n_cores != 1:
assert (args.n_cores > 1), 'Illegal number of cores: {}'.format(args.n_cores)
if args.parallelize_rendering_with_processes:
args.parallelize_rendering_with_threads = False
args.parallelize_rendering = True
args.parallelize_rendering_n_cores = args.n_cores
args_to_object(args, options)
if args.no_separate_detections_by_category:
options.separate_detections_by_category = False
ppresults = process_batch_results(options)
if options.open_output_file:
path_utils.open_file(ppresults.output_html_file)
if __name__ == '__main__':
main()