"""
classification_postprocessing.py
Functions for postprocessing species classification results, particularly:
* Smoothing results within an image (an image with 700 cows and one deer is really just 701
cows)
* Smoothing results within a sequence (a sequence that looks like deer/deer/deer/elk/deer/deer
is really just a deer)
"""
#%% Constants and imports
import os
import json
import copy
import pandas as pd
from collections import defaultdict
from tqdm import tqdm
from megadetector.utils.ct_utils import is_list_sorted
from megadetector.utils.ct_utils import is_empty
from megadetector.utils.ct_utils import sort_dictionary_by_value
from megadetector.utils.ct_utils import sort_dictionary_by_key
from megadetector.utils.ct_utils import invert_dictionary
from megadetector.utils.ct_utils import write_json
from megadetector.postprocessing.validate_batch_results import \
validate_batch_results, ValidateBatchResultsOptions
from megadetector.utils.wi_taxonomy_utils import clean_taxonomy_string
from megadetector.utils.wi_taxonomy_utils import taxonomy_level_index
from megadetector.utils.wi_taxonomy_utils import taxonomy_level_string_to_index
from megadetector.utils.wi_taxonomy_utils import human_prediction_string
from megadetector.utils.wi_taxonomy_utils import animal_prediction_string
from megadetector.utils.wi_taxonomy_utils import is_taxonomic_prediction_string
from megadetector.utils.wi_taxonomy_utils import blank_prediction_string # noqa
#%% Options classes
[docs]
class ClassificationSmoothingOptions:
"""
Options used to parameterize smooth_classification_results_image_level()
and smooth_classification_results_sequence_level()
"""
def __init__(self):
#: How many detections do we need in a dominant category to overwrite
#: non-dominant classifications? This is irrelevant if
#: max_detections_nondominant_class <= 1.
self.min_detections_to_overwrite_secondary = 4
#: Even if we have a dominant class, if a non-dominant class has at least
#: this many classifications in an image, leave them alone.
#:
#: If this is <= 1, we won't replace non-dominant, non-other classes
#: with the dominant class, even if there are 900 cows and 1 deer.
self.max_detections_nondominant_class = 1
#: How many detections do we need in a dominant category to overwrite
#: non-dominant classifications in the same family? If this is <= 0,
#: we'll skip this step. This option doesn't mean anything if
#: max_detections_nondominant_class_same_family <= 1.
self.min_detections_to_overwrite_secondary_same_family = 2
#: If we have this many classifications of a nondominant category,
#: we won't do same-family overwrites. <= 1 means "even if there are
#: a million deer, if there are two million moose, call all the deer
#: moose". This option doesn't mean anything if
#: min_detections_to_overwrite_secondary_same_family <= 0.
self.max_detections_nondominant_class_same_family = -1
#: If the dominant class has at least this many classifications, overwrite
#: "other" classifications with the dominant class
self.min_detections_to_overwrite_other = 2
#: Names to treat as "other" categories; can't be None, but can be empty
#:
#: "Other" classifications will be changed to the dominant category, regardless
#: of confidence, as long as there are at least min_detections_to_overwrite_other
#: examples of the dominant class. For example, cow/other will remain unchanged,
#: but cow/cow/other will become cow/cow/cow.
self.other_category_names = ['other','unknown','no cv result','animal','blank','mammal']
#: We're not even going to mess around with classifications below this threshold.
#:
#: We won't count them, we won't over-write them, they don't exist during the
#: within-image smoothing step.
self.classification_confidence_threshold = 0.5
#: We're not even going to mess around with detections below this threshold.
#:
#: We won't count them, we won't over-write them, they don't exist during the
#: within-image smoothing step.
self.detection_confidence_threshold = 0.15
#: If classification descriptions are present and appear to represent taxonomic
#: information, should we propagate classifications when lower-level taxa are more
#: common in an image? For example, if we see "carnivore/fox/fox/deer", should
#: we make that "fox/fox/fox/deer"?
self.propagate_classifications_through_taxonomy = True
#: When propagating classifications down through taxonomy levels, we have to
#: decide whether we prefer more frequent categories or more specific categories.
#: taxonomy_propagation_level_weight and taxonomy_propagation_count_weight
#: balance levels against counts in this process.
self.taxonomy_propagation_level_weight = 1.0
#: When propagating classifications down through taxonomy levels, we have to
#: decide whether we prefer more frequent categories or more specific categories.
#: taxonomy_propagation_level_weight and taxonomy_propagation_count_weight
#: balance levels against counts in this process.
#:
#: With a very low default value, this just breaks ties.
self.taxonomy_propagation_count_weight = 0.01
#: Should we record information about the state of labels prior to smoothing?
self.add_pre_smoothing_description = True
#: When a dict (rather than a file) is passed to either smoothing function,
#: if this is True, we'll make a copy of the input dict before modifying.
self.modify_in_place = False
#: Only include these categories in the smoothing process (None to use all categories)
self.detection_category_names_to_smooth = ['animal']
## Debug options
#: Enable additional debug output for a particular image
self.break_at_image = None
## Populated internally
#: Only include these categories in the smoothing process (None to use all categories)
self._detection_category_ids_to_smooth = None
#%% Utility functions
def _results_for_sequence(images_this_sequence,filename_to_results):
"""
Fetch MD results for every image in this sequence, based on the 'file_name' field
"""
results_this_sequence = []
for im in images_this_sequence:
fn = im['file_name']
results_this_image = filename_to_results[fn]
assert isinstance(results_this_image,dict)
results_this_sequence.append(results_this_image)
return results_this_sequence
def _sort_images_by_time(images):
"""
Returns a copy of [images], sorted by the 'datetime' field (ascending).
"""
return sorted(images, key = lambda im: im['datetime'])
def _detection_is_relevant_for_smoothing(det,options):
"""
Determine whether [det] has classifications that might be meaningful for smoothing.
"""
if ('classifications' not in det) or \
(det['conf'] < options.detection_confidence_threshold):
return False
# Ignore non-smoothed categories
if (options._detection_category_ids_to_smooth is not None) and \
(det['category'] not in options._detection_category_ids_to_smooth):
return False
return True
[docs]
def count_detections_by_classification_category(detections,options=None):
"""
Count the number of instances of each classification category in the detections list
[detections] that have an above-threshold detection. Sort results in descending
order by count. Returns a dict mapping category ID --> count. If no detections
are above threshold, returns an empty dict.
Only processes the top classification for each detection.
Args:
detections (list of dict): detections list
options (ClassificationSmoothingOptions, optional): see ClassificationSmoothingOptions
Returns:
dict mapping above-threshold category IDs to counts
"""
if detections is None or len(detections) == 0:
return {}
if options is None:
options = ClassificationSmoothingOptions()
category_to_count = defaultdict(int)
for det in detections:
if not _detection_is_relevant_for_smoothing(det,options):
continue
c = det['classifications'][0]
if c[1] >= options.classification_confidence_threshold:
category_to_count[c[0]] += 1
category_to_count = {k: v for k, v in sorted(category_to_count.items(),
key=lambda item: item[1],
reverse=True)}
return category_to_count
[docs]
def get_classification_description_string(category_to_count,classification_descriptions):
"""
Return a string summarizing the image content according to [category_to_count].
Args:
category_to_count (dict): a dict mapping category IDs to counts
classification_descriptions (dict): a dict mapping category IDs to description strings
Returns:
string: a description of this image's content, e.g. "rabbit (4), human (1)"
"""
category_strings = []
# category_id = next(iter(category_to_count))
for category_id in category_to_count:
category_description = classification_descriptions[category_id]
tokens = category_description.split(';')
assert len(tokens) == 7
category_name = tokens[-1]
if len(category_name) == 0:
category_name = 'undefined category'
count = category_to_count[category_id]
category_string = '{} ({})'.format(category_name,count)
category_strings.append(category_string)
return ', '.join(category_strings)
def _print_counts_with_names(category_to_count,classification_descriptions):
"""
Print a list of classification categories with counts, based in the name --> count
dict [category_to_count]
"""
for category_id in category_to_count:
category_name = classification_descriptions[category_id]
count = category_to_count[category_id]
print('{}: {} ({})'.format(category_id,category_name,count))
def _prepare_results_for_smoothing(input_file,options):
"""
Load results from [input_file] if necessary, prepare category descriptions
for smoothing. Adds pre-smoothing descriptions to every image if the options
say we're supposed to do that.
May modify some fields in [options].
"""
if isinstance(input_file,str):
with open(input_file,'r') as f:
print('Loading results from:\n{}'.format(input_file))
d = json.load(f)
else:
assert isinstance(input_file,dict)
if options.modify_in_place:
d = input_file
else:
print('modify_in_place is False, copying the input before modifying')
d = copy.deepcopy(input_file)
## Category processing
category_name_to_id = {d['classification_categories'][k]:k for k in d['classification_categories']}
other_category_ids = []
for s in options.other_category_names:
if s in category_name_to_id:
other_category_ids.append(category_name_to_id[s])
# Possibly update the list of category IDs we should smooth
if options.detection_category_names_to_smooth is None:
options._detection_category_ids_to_smooth = None
else:
detection_category_id_to_name = d['detection_categories']
detection_category_name_to_id = invert_dictionary(detection_category_id_to_name)
options._detection_category_ids_to_smooth = []
for category_name in options.detection_category_names_to_smooth:
options._detection_category_ids_to_smooth.append(detection_category_name_to_id[category_name])
# Before we do anything else, get rid of everything but the top classification
# for each detection, and remove the 'classifications' field from detections with
# no classifications.
for im in tqdm(d['images']):
if 'detections' not in im or im['detections'] is None or len(im['detections']) == 0:
continue
detections = im['detections']
for det in detections:
if 'classifications' not in det:
continue
if len(det['classifications']) == 0:
del det['classifications']
continue
classification_confidence_values = [c[1] for c in det['classifications']]
assert is_list_sorted(classification_confidence_values,reverse=True)
det['classifications'] = [det['classifications'][0]]
# ...for each detection in this image
# ...for each image
## Clean up classification descriptions...
# ...so we can test taxonomic relationships by substring testing.
classification_descriptions_clean = None
classification_descriptions = None
if 'classification_category_descriptions' in d:
classification_descriptions = d['classification_category_descriptions']
classification_descriptions_single = {}
# We use "|" to delimit multiple descriptions, just use the first
# for smoothing. This isn't perfect or "correct", but it's reasonable.
for k in classification_descriptions.keys():
v = classification_descriptions[k]
v = v.split('|')[0]
classification_descriptions_single[k] = v
classification_descriptions = classification_descriptions_single
classification_descriptions_clean = {}
# category_id = next(iter(classification_descriptions))
for category_id in classification_descriptions:
classification_descriptions_clean[category_id] = \
clean_taxonomy_string(classification_descriptions[category_id]).strip(';').lower()
## Optionally add pre-smoothing descriptions to every image
if options.add_pre_smoothing_description and (classification_descriptions is not None):
for im in tqdm(d['images']):
if 'detections' not in im or im['detections'] is None or len(im['detections']) == 0:
continue
detections = im['detections']
category_to_count = count_detections_by_classification_category(detections, options)
im['pre_smoothing_description'] = \
get_classification_description_string(category_to_count, classification_descriptions)
return {
'd':d,
'other_category_ids':other_category_ids,
'classification_descriptions_clean':classification_descriptions_clean,
'classification_descriptions':classification_descriptions
}
# ...def _prepare_results_for_smoothing(...)
def _smooth_classifications_for_list_of_detections(detections,
options,
other_category_ids,
classification_descriptions,
classification_descriptions_clean):
"""
Smooth classifications for a list of detections, which may have come from a single
image, or may represent an entire sequence.
Returns None if no changes are made, else a dict.
classification_descriptions_clean should be semicolon-delimited taxonomic strings
from which common names and GUIDs have already been removed.
Assumes there is only one classification per detection, i.e. that non-top classifications
have already been remoevd.
"""
## Count the number of instances of each category in this image
category_to_count = count_detections_by_classification_category(detections, options)
# _print_counts_with_names(category_to_count,classification_descriptions)
# get_classification_description_string(category_to_count, classification_descriptions)
if len(category_to_count) <= 1:
return None
keys = list(category_to_count.keys())
# Handle a quirky special case: if the most common category is "other" and
# it's "tied" with the second-most-common category, swap them
if (len(keys) > 1) and \
(keys[0] in other_category_ids) and \
(keys[1] not in other_category_ids) and \
(category_to_count[keys[0]] == category_to_count[keys[1]]):
keys[1], keys[0] = keys[0], keys[1]
max_count = category_to_count[keys[0]]
most_common_category = keys[0]
del keys
## Debug tools
verbose_debug_enabled = False
if options.break_at_image is not None:
for det in detections:
if 'image_filename' in det and \
det['image_filename'] == options.break_at_image:
verbose_debug_enabled = True
break
if verbose_debug_enabled:
_print_counts_with_names(category_to_count,classification_descriptions)
# from IPython import embed; embed()
## Possibly change "other" classifications to the most common category
# ...if the dominant category is not an "other" category.
n_other_classifications_changed_this_image = 0
# If we have at least *min_detections_to_overwrite_other* in a category that isn't
# "other", change all "other" classifications to that category
if (max_count >= options.min_detections_to_overwrite_other) and \
(most_common_category not in other_category_ids):
for det in detections:
if not _detection_is_relevant_for_smoothing(det,options):
continue
assert len(det['classifications']) == 1
c = det['classifications'][0]
if (c[1] >= options.classification_confidence_threshold) and \
(c[0] in other_category_ids):
if verbose_debug_enabled:
print('Replacing {} with {}'.format(
classification_descriptions[c[0]],
most_common_category))
n_other_classifications_changed_this_image += 1
c[0] = most_common_category
# ...if there are classifications for this detection
# ...for each detection
# ...if we should overwrite all "other" classifications
if verbose_debug_enabled:
print('Made {} other changes'.format(n_other_classifications_changed_this_image))
## Re-count
category_to_count = count_detections_by_classification_category(detections, options)
# _print_counts_with_names(category_to_count,classification_descriptions)
keys = list(category_to_count.keys())
max_count = category_to_count[keys[0]]
most_common_category = keys[0]
del keys
## Possibly change some non-dominant classifications to the dominant category
process_taxonomic_rules = \
(classification_descriptions_clean is not None) and \
(len(classification_descriptions_clean) > 0) and \
(len(category_to_count) > 1)
n_detections_flipped_this_image = 0
# Don't do this if the most common category is an "other" category, or
# if we don't have enough of the most common category
if (most_common_category not in other_category_ids) and \
(max_count >= options.min_detections_to_overwrite_secondary):
# i_det = 0; det = detections[i_det]
for i_det,det in enumerate(detections):
if not _detection_is_relevant_for_smoothing(det,options):
continue
assert len(det['classifications']) == 1
c = det['classifications'][0]
# Don't over-write the most common category with itself
if c[0] == most_common_category:
continue
# Don't bother with below-threshold classifications
if c[1] < options.classification_confidence_threshold:
continue
# If we're doing taxonomic processing, at this stage, don't turn children
# into parents; we'll likely turn parents into children in the next stage.
if process_taxonomic_rules:
most_common_category_description = \
classification_descriptions_clean[most_common_category]
category_id_this_classification = c[0]
assert category_id_this_classification in category_to_count
category_description_this_classification = \
classification_descriptions_clean[category_id_this_classification]
# An empty description corresponds to the "animal" category. We don't handle
# "animal" here as a parent category, that would be handled in the "other smoothing"
# step above.
if len(category_description_this_classification) == 0:
continue
most_common_category_is_parent_of_this_category = \
most_common_category_description in category_description_this_classification
if most_common_category_is_parent_of_this_category:
continue
# If we have fewer of this category than the most common category,
# but not *too* many, flip it to the most common category.
if (max_count > category_to_count[c[0]]) and \
(category_to_count[c[0]] <= options.max_detections_nondominant_class):
c[0] = most_common_category
n_detections_flipped_this_image += 1
# ...for each detection
# ...if the dominant category is legit
if verbose_debug_enabled:
print('Made {} non-dominant --> dominant changes'.format(
n_detections_flipped_this_image))
## Re-count
category_to_count = count_detections_by_classification_category(detections, options)
# _print_counts_with_names(category_to_count,classification_descriptions)
keys = list(category_to_count.keys())
max_count = category_to_count[keys[0]]
most_common_category = keys[0]
del keys
## Possibly collapse higher-level taxonomic predictions down to lower levels
n_taxonomic_changes_this_image = 0
process_taxonomic_rules = \
(classification_descriptions_clean is not None) and \
(len(classification_descriptions_clean) > 0) and \
(len(category_to_count) > 1)
if process_taxonomic_rules and options.propagate_classifications_through_taxonomy:
# det = detections[3]
for det in detections:
if not _detection_is_relevant_for_smoothing(det,options):
continue
assert len(det['classifications']) == 1
c = det['classifications'][0]
# Don't bother with any classifications below the confidence threshold
if c[1] < options.classification_confidence_threshold:
continue
category_id_this_classification = c[0]
assert category_id_this_classification in category_to_count
category_description_this_classification = \
classification_descriptions_clean[category_id_this_classification]
# An empty description corresponds to the "animal" category. We don't handle
# "animal" here as a parent category, that would be handled in the "other smoothing"
# step above.
if len(category_description_this_classification) == 0:
continue
# We may have multiple child categories to choose from; this keeps track of
# the "best" we've seen so far. "Best" is based on the level (species is better
# than genus) and number.
child_category_to_score = defaultdict(float)
for category_id_of_candidate_child in category_to_count.keys():
# A category is never its own child
if category_id_of_candidate_child == category_id_this_classification:
continue
# Is this candidate a child of the current classification?
category_description_candidate_child = \
classification_descriptions_clean[category_id_of_candidate_child]
# An empty description corresponds to "animal", which can never
# be a child of another category.
if len(category_description_candidate_child) == 0:
continue
# This handles a case that doesn't come up with "pure" SpeciesNet results;
# if two categories have different IDs but the same "clean" description, this
# means they're different common names for the same species, which we use
# for things like "white-tailed deer buck" and "white-tailed deer fawn".
#
# Currently we don't support smoothing those predictions, because it's not a
# parent/child relationship.
if category_description_candidate_child == \
category_description_this_classification:
continue
# As long as we're using "clean" descriptions, parent/child taxonomic
# relationships are defined by a substring relationship
is_child = category_description_this_classification in \
category_description_candidate_child
if not is_child:
continue
# How many instances of this child category are there?
child_category_count = category_to_count[category_id_of_candidate_child]
# What taxonomy level is this child category defined at?
child_category_level = taxonomy_level_index(
classification_descriptions[category_id_of_candidate_child])
child_category_to_score[category_id_of_candidate_child] = \
child_category_level * options.taxonomy_propagation_level_weight + \
child_category_count * options.taxonomy_propagation_count_weight
# ...for each category we are considering reducing this classification to
# Did we find a category we want to change this classification to?
if len(child_category_to_score) > 0:
# Find the child category with the highest score
child_category_to_score = sort_dictionary_by_value(
child_category_to_score,reverse=True)
best_child_category = next(iter(child_category_to_score.keys()))
if verbose_debug_enabled:
old_category_name = \
classification_descriptions_clean[c[0]]
new_category_name = \
classification_descriptions_clean[best_child_category]
print('Replacing {} with {}'.format(
old_category_name,new_category_name))
c[0] = best_child_category
n_taxonomic_changes_this_image += 1
# ...for each detection
# ...if we have taxonomic information available
## Re-count
category_to_count = count_detections_by_classification_category(detections, options)
# _print_counts_with_names(category_to_count,classification_descriptions)
keys = list(category_to_count.keys())
max_count = category_to_count[keys[0]]
most_common_category = keys[0]
del keys
## Possibly do within-family smoothing
n_within_family_smoothing_changes = 0
# min_detections_to_overwrite_secondary_same_family = -1
# max_detections_nondominant_class_same_family = 1
family_level = taxonomy_level_string_to_index('family')
if process_taxonomic_rules:
category_description_most_common_category = \
classification_descriptions[most_common_category]
most_common_category_taxonomic_level = \
taxonomy_level_index(category_description_most_common_category)
n_most_common_category = category_to_count[most_common_category]
tokens = category_description_most_common_category.split(';')
assert len(tokens) == 7
most_common_category_family = tokens[3]
most_common_category_genus = tokens[4]
# Only consider remapping to genus or species level, and only when we have
# a high enough count in the most common category
if process_taxonomic_rules and \
(options.min_detections_to_overwrite_secondary_same_family > 0) and \
(most_common_category not in other_category_ids) and \
(most_common_category_taxonomic_level > family_level) and \
(n_most_common_category >= options.min_detections_to_overwrite_secondary_same_family):
# det = detections[0]
for det in detections:
if not _detection_is_relevant_for_smoothing(det,options):
continue
assert len(det['classifications']) == 1
c = det['classifications'][0]
# Don't over-write the most common category with itself
if c[0] == most_common_category:
continue
# Don't bother with below-threshold classifications
if c[1] < options.classification_confidence_threshold:
continue
n_candidate_flip_category = category_to_count[c[0]]
# Do we have too many of the non-dominant category to do this kind of swap?
if n_candidate_flip_category > \
options.max_detections_nondominant_class_same_family:
continue
# Don't flip classes when it's a tie
if n_candidate_flip_category == n_most_common_category:
continue
category_description_candidate_flip = \
classification_descriptions[c[0]]
tokens = category_description_candidate_flip.split(';')
assert len(tokens) == 7
candidate_flip_category_family = tokens[3]
candidate_flip_category_genus = tokens[4]
candidate_flip_category_taxonomic_level = \
taxonomy_level_index(category_description_candidate_flip)
# Only proceed if we have valid family strings
if (len(candidate_flip_category_family) == 0) or \
(len(most_common_category_family) == 0):
continue
# Only proceed if the candidate and the most common category are in the same family
if candidate_flip_category_family != most_common_category_family:
continue
# Don't flip from a species to the genus level in the same genus
if (candidate_flip_category_genus == most_common_category_genus) and \
(candidate_flip_category_taxonomic_level > \
most_common_category_taxonomic_level):
continue
old_category_name = classification_descriptions_clean[c[0]]
new_category_name = classification_descriptions_clean[most_common_category]
c[0] = most_common_category
n_within_family_smoothing_changes += 1
# ...for each detection
# ...if the dominant category is legit and we have taxonomic information available
if verbose_debug_enabled:
print('Made {} same-family changes'.format(n_within_family_smoothing_changes))
return {'n_other_classifications_changed_this_image':n_other_classifications_changed_this_image,
'n_detections_flipped_this_image':n_detections_flipped_this_image,
'n_taxonomic_changes_this_image':n_taxonomic_changes_this_image,
'n_within_family_smoothing_changes':n_within_family_smoothing_changes}
# ...def _smooth_classifications_for_list_of_detections(...)
def _smooth_single_image(im,
options,
other_category_ids,
classification_descriptions,
classification_descriptions_clean):
"""
Smooth classifications for a single image. Returns None if no changes are made,
else a dict.
classification_descriptions_clean should be semicolon-delimited taxonomic strings
from which common names and GUIDs have already been removed.
Assumes there is only one classification per detection, i.e. that non-top classifications
have already been remoevd.
"""
if 'detections' not in im or im['detections'] is None or len(im['detections']) == 0:
return
detections = im['detections']
# Simplify debugging
for det in detections:
det['image_filename'] = im['file']
to_return = _smooth_classifications_for_list_of_detections(detections,
options=options,
other_category_ids=other_category_ids,
classification_descriptions=classification_descriptions,
classification_descriptions_clean=classification_descriptions_clean)
# Clean out debug information
for det in detections:
del det['image_filename']
return to_return
# ...def smooth_single_image
#%% Image-level smoothing
[docs]
def smooth_classification_results_image_level(input_file,output_file=None,options=None):
"""
Smooth classifications at the image level for all results in the MD-formatted results
file [input_file], optionally writing a new set of results to [output_file].
This function generally expresses the notion that an image with 700 cows and one deer
is really just 701 cows.
Only count detections with a classification confidence threshold above
[options.classification_confidence_threshold], which in practice means we're only
looking at one category per detection.
If an image has at least [options.min_detections_to_overwrite_secondary] such detections
in the most common category, and no more than [options.max_detections_nondominant_class]
in the second-most-common category, flip all detections to the most common
category.
Optionally treat some classes as particularly unreliable, typically used to overwrite an
"other" class.
This function also removes everything but the non-dominant classification for each detection.
Args:
input_file (str): MegaDetector-formatted classification results file to smooth. Can
also be an already-loaded results dict.
output_file (str, optional): .json file to write smoothed results
options (ClassificationSmoothingOptions, optional): see
ClassificationSmoothingOptions for details.
Returns:
dict: MegaDetector-results-formatted dict, identical to what's written to
[output_file] if [output_file] is not None.
"""
## Input validation
if options is None:
options = ClassificationSmoothingOptions()
r = _prepare_results_for_smoothing(input_file, options)
d = r['d']
other_category_ids = r['other_category_ids']
classification_descriptions_clean = r['classification_descriptions_clean']
classification_descriptions = r['classification_descriptions']
## Smoothing
n_other_classifications_changed = 0
n_other_images_changed = 0
n_taxonomic_images_changed = 0
n_detections_flipped = 0
n_images_changed = 0
n_taxonomic_classification_changes = 0
# im = d['images'][0]
for im in tqdm(d['images']):
r = _smooth_single_image(im,
options,
other_category_ids,
classification_descriptions=classification_descriptions,
classification_descriptions_clean=classification_descriptions_clean)
if r is None:
continue
n_detections_flipped_this_image = r['n_detections_flipped_this_image']
n_other_classifications_changed_this_image = \
r['n_other_classifications_changed_this_image']
n_taxonomic_changes_this_image = r['n_taxonomic_changes_this_image']
n_detections_flipped += n_detections_flipped_this_image
n_other_classifications_changed += n_other_classifications_changed_this_image
n_taxonomic_classification_changes += n_taxonomic_changes_this_image
if n_detections_flipped_this_image > 0:
n_images_changed += 1
if n_other_classifications_changed_this_image > 0:
n_other_images_changed += 1
if n_taxonomic_changes_this_image > 0:
n_taxonomic_images_changed += 1
# ...for each image
print('Classification smoothing: changed {} detections on {} images'.format(
n_detections_flipped,n_images_changed))
print('"Other" smoothing: changed {} detections on {} images'.format(
n_other_classifications_changed,n_other_images_changed))
print('Taxonomic smoothing: changed {} detections on {} images'.format(
n_taxonomic_classification_changes,n_taxonomic_images_changed))
## Write output
if output_file is not None:
print('Writing results after image-level smoothing to:\n{}'.format(output_file))
write_json(output_file,d)
return d
# ...def smooth_classification_results_image_level(...)
#%% Sequence-level smoothing
[docs]
def smooth_classification_results_sequence_level(input_file,
cct_sequence_information,
output_file=None,
options=None):
"""
Smooth classifications at the sequence level for all results in the MD-formatted results
file [md_results_file], optionally writing a new set of results to [output_file].
This function generally expresses the notion that a sequence that looks like
deer/deer/deer/elk/deer/deer/deer/deer is really just a deer.
Args:
input_file (str or dict): MegaDetector-formatted classification results file to smooth
(or already-loaded results). If you supply a dict, it's copied by default, but
in-place modification is supported via options.modify_in_place.
cct_sequence_information (str, dict, or list): COCO Camera Traps file containing sequence IDs for
each image (or an already-loaded CCT-formatted dict, or just the 'images' list from a CCT dict).
output_file (str, optional): .json file to write smoothed results
options (ClassificationSmoothingOptions, optional): see
ClassificationSmoothingOptions for details.
Returns:
dict: MegaDetector-results-formatted dict, identical to what's written to
[output_file] if [output_file] is not None.
"""
## Input validation
if options is None:
options = ClassificationSmoothingOptions()
r = _prepare_results_for_smoothing(input_file, options)
d = r['d']
other_category_ids = r['other_category_ids']
classification_descriptions_clean = r['classification_descriptions_clean']
classification_descriptions = r['classification_descriptions']
## Make a list of images appearing in each sequence
if isinstance(cct_sequence_information,list):
image_info = cct_sequence_information
elif isinstance(cct_sequence_information,str):
print('Loading sequence information from {}'.format(cct_sequence_information))
with open(cct_sequence_information,'r') as f:
cct_sequence_information = json.load(f)
image_info = cct_sequence_information['images']
else:
assert isinstance(cct_sequence_information,dict)
image_info = cct_sequence_information['images']
sequence_to_image_filenames = defaultdict(list)
# im = image_info[0]
for im in tqdm(image_info):
sequence_to_image_filenames[im['seq_id']].append(im['file_name'])
del image_info
image_fn_to_classification_results = {}
for im in d['images']:
fn = im['file']
assert fn not in image_fn_to_classification_results
image_fn_to_classification_results[fn] = im
## Smoothing
n_other_classifications_changed = 0
n_other_sequences_changed = 0
n_taxonomic_sequences_changed = 0
n_within_family_sequences_changed = 0
n_detections_flipped = 0
n_sequences_changed = 0
n_taxonomic_classification_changes = 0
n_within_family_changes = 0
# sequence_id = list(sequence_to_image_filenames.keys())[0]
for sequence_id in sequence_to_image_filenames.keys():
image_filenames_this_sequence = sequence_to_image_filenames[sequence_id]
# if 'file' in image_filenames_this_sequence:
# from IPython import embed; embed()
detections_this_sequence = []
for image_filename in image_filenames_this_sequence:
if image_filename not in image_fn_to_classification_results:
print('Warning: {} in sequence list but not in results'.format(
image_filename))
continue
im = image_fn_to_classification_results[image_filename]
if 'detections' not in im or im['detections'] is None:
continue
detections_this_sequence.extend(im['detections'])
# Temporarily add image filenames to every detection,
# for debugging
for det in im['detections']:
det['image_filename'] = im['file']
if len(detections_this_sequence) == 0:
continue
r = _smooth_classifications_for_list_of_detections(
detections=detections_this_sequence,
options=options,
other_category_ids=other_category_ids,
classification_descriptions=classification_descriptions,
classification_descriptions_clean=classification_descriptions_clean)
if r is None:
continue
n_detections_flipped_this_sequence = r['n_detections_flipped_this_image']
n_other_classifications_changed_this_sequence = \
r['n_other_classifications_changed_this_image']
n_taxonomic_changes_this_sequence = r['n_taxonomic_changes_this_image']
n_within_family_changes_this_sequence = r['n_within_family_smoothing_changes']
n_detections_flipped += n_detections_flipped_this_sequence
n_other_classifications_changed += n_other_classifications_changed_this_sequence
n_taxonomic_classification_changes += n_taxonomic_changes_this_sequence
n_within_family_changes += n_within_family_changes_this_sequence
if n_detections_flipped_this_sequence > 0:
n_sequences_changed += 1
if n_other_classifications_changed_this_sequence > 0:
n_other_sequences_changed += 1
if n_taxonomic_changes_this_sequence > 0:
n_taxonomic_sequences_changed += 1
if n_within_family_changes_this_sequence > 0:
n_within_family_sequences_changed += 1
# ...for each sequence
print('Classification smoothing: changed {} detections in {} sequences'.format(
n_detections_flipped,n_sequences_changed))
print('"Other" smoothing: changed {} detections in {} sequences'.format(
n_other_classifications_changed,n_other_sequences_changed))
print('Taxonomic smoothing: changed {} detections in {} sequences'.format(
n_taxonomic_classification_changes,n_taxonomic_sequences_changed))
print('Within-family smoothing: changed {} detections in {} sequences'.format(
n_within_family_changes,n_within_family_sequences_changed))
## Clean up debug information
for im in d['images']:
if 'detections' not in im or im['detections'] is None:
continue
for det in im['detections']:
if 'image_filename' in det:
del det['image_filename']
## Write output
if output_file is not None:
print('Writing sequence-smoothed classification results to {}'.format(
output_file))
write_json(output_file,d)
return d
# ...smooth_classification_results_sequence_level(...)
[docs]
def remove_classifications_from_non_animal_detections(input_file,
output_file,
animal_category_names=None):
"""
Remove classifications from non-animal detections in a MD .json file,
optionally writing the results to a new .json file
Args:
input_file (str): the MD-formatted .json file to process
output_file (str, optional): the output file to write the modified results
animal_category_names (list, optional): the detection category names that should
be treated as animals (defaults to just 'animal').
Returns:
dict: the modified results
"""
if animal_category_names is None:
animal_category_names = ['animal']
animal_category_names = set(animal_category_names)
with open(input_file,'r') as f:
d = json.load(f)
category_id_to_name = d['detection_categories']
n_classifications_removed = 0
n_detections = 0
# im = d['images'][0]
for im in d['images']:
if ('detections' not in im) or (im['detections'] is None):
continue
n_detections += len(im['detections'])
for det in im['detections']:
if 'classifications' not in det:
continue
category_id = det['category']
category_name = category_id_to_name[category_id]
if category_name not in animal_category_names:
del det['classifications']
n_classifications_removed += 1
continue
# ...for each detection
# ...for each image
print('Removed classifications from {} of {} detections'.format(
n_classifications_removed,n_detections))
if output_file is not None:
write_json(output_file,d)
return d
# ...def remove_classifications_from_non_animal_detections(...)
[docs]
def restrict_to_taxa_list(taxa_list,
speciesnet_taxonomy_file,
input_file,
output_file,
allow_walk_down=False,
add_pre_filtering_description=True,
add_post_filtering_description=True,
allow_redundant_latin_names=True,
protected_common_names=None,
use_original_common_names_if_available=True,
verbose=True,
classification_threshold=None,
combine_redundant_categories=True):
"""
Given a prediction file in MD .json format, likely without having had
a geofence applied, apply a custom taxa list.
Args:
taxa_list (str): .csv file with at least the columns "latin" and "common"
speciesnet_taxonomy_file (str): taxonomy filename, in the same format used for
model release (with 7-token taxonomy entries)
input_file (str): .json file to read, in MD format. This can be None, in which
case this function just validates [taxa_list].
output_file (str): .json file to write, in MD format
allow_walk_down (bool, optional): should we walk down the taxonomy tree
when making mappings if a parent has only a single allowable child?
For example, if only a single felid species is allowed, should other
felid predictions be mapped to that species, as opposed to being mapped
to the family?
add_pre_filtering_description (bool, optional): should we add a new metadata
field that summarizes each image's classifications prior to taxonomic
restriction?
add_post_filtering_description (bool, optional): should we add a new metadata
field that summarizes each image's classifications after taxonomic
restriction?
allow_redundant_latin_names (bool, optional): if False, we'll raise an Exception
if the same latin name appears twice in the taxonomy list; if True, we'll
just print a warning and ignore all entries other than the first for this
latin name
protected_common_names (list, optional): these categories should be
unmodified, even if they aren't used, or have the same taxonomic
description as other categories
use_original_common_names_if_available (bool, optional): if an "original_common"
column is present in [taxa_list], use those common names instead of the ones
in the taxonomy file
verbose (bool, optional): enable additional debug output
classification_threshold (float, optional): only relevant for the pre/post filtering
descriptions
combine_redundant_categories (bool, optional): whether to combine categories with the
same common name
"""
##%% Read target taxa list
taxa_list_df = pd.read_csv(taxa_list)
# Convert NaN values to empty strings
taxa_list_df = taxa_list_df.fillna('')
# Strip string values
string_cols = taxa_list_df.select_dtypes(include=['object']).columns
taxa_list_df[string_cols] = taxa_list_df[string_cols].apply(lambda col: col.str.strip())
required_columns = ('latin','common')
for s in required_columns:
assert s in taxa_list_df.columns, \
'Required column {} missing from taxonomy list file {}'.format(
s,taxa_list)
# Convert the "latin" and "common" columns in taxa_list_df to lowercase
taxa_list_df['latin'] = taxa_list_df['latin'].str.lower()
taxa_list_df['common'] = taxa_list_df['common'].str.lower()
# Remove rows from taxa_list_df where the "latin" column is nan,
# printing a warning for each row (with a string representation of the whole row)
for i_row,row in taxa_list_df.iterrows():
if len(row['latin']) == 0:
if verbose:
print('Warning: Skipping row with empty "latin" column in {}:\n{}\n'.format(
taxa_list,str(row.to_dict())))
taxa_list_df.drop(index=i_row, inplace=True)
# Create a dictionary mapping source Latin names to target common names
target_latin_to_common = {}
for i_row,row in taxa_list_df.iterrows():
latin = row['latin']
common = row['common']
if use_original_common_names_if_available and \
('original_common' in row) and \
(not is_empty(row['original_common'])):
common = row['original_common'].strip().lower()
# Valid latin names have either one token (e.g. "canidae"),
# two tokens (e.g. "bos taurus"), or three tokens (e.g. "canis lupus familiaris")
assert len(latin.split(' ')) in (1,2,3), \
'Illegal binomial name {} in taxaonomy list {}'.format(
latin,taxa_list)
if latin in target_latin_to_common:
error_string = \
'scientific name {} appears multiple times in the taxonomy list'.format(
latin)
if allow_redundant_latin_names:
if verbose:
print('Warning: {}'.format(error_string))
else:
raise ValueError(error_string)
target_latin_to_common[latin] = common
# ...for each row in the custom taxonomy list
##%% Read taxonomy file
with open(speciesnet_taxonomy_file,'r') as f:
speciesnet_taxonomy_list = f.readlines()
speciesnet_taxonomy_list = [s.strip() for s in \
speciesnet_taxonomy_list if len(s.strip()) > 0]
# Maps the latin name of every taxon to the corresponding full taxon string,
# excluding the GUID.
#
# For species, the key is a binomial name
speciesnet_latin_name_to_taxon_string = {}
speciesnet_common_name_to_taxon_string = {}
def _insert_taxonomy_string(s):
"""
Insert the latin name represented by [s] into
speciesnet_latin_name_to_taxon_string. The latin name is
the lowest-level non-empty token (or the genus+species if all
tokens are populated).
Also inserts into speciesnet_common_name_to_taxon_string.
Sets GUIDs to a uniform value to simplify matching later.
"""
tokens = s.split(';')
assert len(tokens) == 7, 'Illegal taxonomy string {}'.format(s)
# guid = tokens[0] # noqa
tokens[0] = 'guid'
class_name = tokens[1]
order = tokens[2]
family = tokens[3]
genus = tokens[4]
species = tokens[5]
common_name = tokens[6]
s = ';'.join(tokens)
if len(class_name) == 0:
assert common_name in ('animal','vehicle','blank'), \
'Illegal common name {}'.format(common_name)
return
if len(species) > 0:
assert all([len(s) > 0 for s in [genus,family,order]]), \
'Higher-level taxa missing for {}: {},{},{}'.format(s,genus,family,order)
binomial_name = genus + ' ' + species
if binomial_name not in speciesnet_latin_name_to_taxon_string:
speciesnet_latin_name_to_taxon_string[binomial_name] = s
elif len(genus) > 0:
assert all([len(s) > 0 for s in [family,order]]), \
'Higher-level taxa missing for {}: {},{}'.format(s,family,order)
if genus not in speciesnet_latin_name_to_taxon_string:
speciesnet_latin_name_to_taxon_string[genus] = s
elif len(family) > 0:
assert len(order) > 0, \
'Higher-level taxa missing for {}: {}'.format(s,order)
if family not in speciesnet_latin_name_to_taxon_string:
speciesnet_latin_name_to_taxon_string[family] = s
elif len(order) > 0:
if order not in speciesnet_latin_name_to_taxon_string:
speciesnet_latin_name_to_taxon_string[order] = s
else:
if class_name not in speciesnet_latin_name_to_taxon_string:
speciesnet_latin_name_to_taxon_string[class_name] = s
if len(common_name) > 0:
if common_name not in speciesnet_common_name_to_taxon_string:
speciesnet_common_name_to_taxon_string[common_name] = s
# Insert everything from the taxonomy .txt file into our taxonomy dicts
for s in speciesnet_taxonomy_list:
_insert_taxonomy_string(s)
##%% Make sure all parent taxa are represented in the taxonomy
# In theory any taxon that appears as the parent of another taxon should
# also be in the taxonomy, but this isn't always true, so we fix it here.
new_taxon_string_to_missing_tokens = defaultdict(list)
# latin_name = next(iter(speciesnet_latin_name_to_taxon_string.keys()))
for latin_name in speciesnet_latin_name_to_taxon_string.keys():
if 'no cv result' in latin_name:
continue
taxon_string = speciesnet_latin_name_to_taxon_string[latin_name]
tokens = taxon_string.split(';')
# Don't process GUID, species, or common name
for i_token in range(1,len(tokens)-2):
test_token = tokens[i_token]
if len(test_token) == 0:
continue
# Do we need to make up a taxon for this token?
if test_token not in speciesnet_latin_name_to_taxon_string:
new_tokens = [''] * 7
new_tokens[0] = 'guid'
for i_copy_token in range(1,i_token+1):
new_tokens[i_copy_token] = tokens[i_copy_token]
new_tokens[-1] = test_token + ' species'
assert new_tokens[-2] == '', \
'Illegal taxonomy string {}'.format(taxon_string)
new_taxon_string = ';'.join(new_tokens)
# assert new_taxon_string not in new_taxon_strings
new_taxon_string_to_missing_tokens[new_taxon_string].append(test_token)
# ...for each token
# ...for each taxon
new_taxon_string_to_missing_tokens = \
sort_dictionary_by_key(new_taxon_string_to_missing_tokens)
if verbose:
print(f'Found {len(new_taxon_string_to_missing_tokens)} taxa that need to be inserted to ' + \
'make the taxonomy valid, showing only mammals and birds here:\n')
for taxon_string in new_taxon_string_to_missing_tokens:
if 'mammalia' not in taxon_string and 'aves' not in taxon_string:
continue
missing_taxa = ','.join(new_taxon_string_to_missing_tokens[taxon_string])
print('{} ({})'.format(taxon_string,missing_taxa))
print('')
for new_taxon_string in new_taxon_string_to_missing_tokens:
_insert_taxonomy_string(new_taxon_string)
del new_taxon_string_to_missing_tokens
##%% Store custom common name mappings based on the taxonomy list
speciesnet_latin_name_to_output_common_name = {}
# latin_name = next(iter(speciesnet_latin_name_to_taxon_string.keys()))
for latin_name in speciesnet_latin_name_to_taxon_string.keys():
if latin_name in target_latin_to_common:
speciesnet_latin_name_to_output_common_name[latin_name] = \
target_latin_to_common[latin_name]
##%% Make sure all taxa on the allow-list are in the taxonomy
n_failed_mappings = 0
for target_taxon_latin_name in target_latin_to_common.keys():
if target_taxon_latin_name not in speciesnet_latin_name_to_taxon_string:
common_name = target_latin_to_common[target_taxon_latin_name]
s = '{} ({}) not in speciesnet taxonomy'.format(
target_taxon_latin_name,common_name)
if common_name in speciesnet_common_name_to_taxon_string:
s += ' (common name maps to {})'.format(
speciesnet_common_name_to_taxon_string[common_name])
print(s)
n_failed_mappings += 1
if n_failed_mappings > 0:
raise ValueError('Cannot continue with taxonomic restriction')
##%% For the allow-list, map each parent taxon to a set of allowable child taxa
# Maps parent names to all allowed child names, or None if this is the
# lowest-level allowable taxon on this path
allowed_parent_taxon_to_child_taxa = defaultdict(set)
# latin_name = next(iter(target_latin_to_common.keys()))
for latin_name in target_latin_to_common:
taxon_string = speciesnet_latin_name_to_taxon_string[latin_name]
tokens = taxon_string.split(';')
assert len(tokens) == 7, \
'Illegal taxonomy string {}'.format(taxon_string)
# Remove GUID and common mame
#
# This is now always class/order/family/genus/species
tokens = tokens[1:-1]
child_taxon = None
# If this is a species
if len(tokens[-1]) > 0:
binomial_name = tokens[-2] + ' ' + tokens[-1]
assert binomial_name == latin_name, \
'Binomial/latin mismatch: {} vs {}'.format(binomial_name,latin_name)
# If this already exists, it should only allow "None"
if binomial_name in allowed_parent_taxon_to_child_taxa:
assert len(allowed_parent_taxon_to_child_taxa[binomial_name]) == 1, \
'Species-level entry {} has multiple children'.format(binomial_name)
assert None in allowed_parent_taxon_to_child_taxa[binomial_name], \
'Species-level entry {} has non-None children'.format(binomial_name)
allowed_parent_taxon_to_child_taxa[binomial_name].add(None)
child_taxon = binomial_name
# The first level that can ever be a parent taxon is the genus level
parent_token_index = len(tokens) - 2
# Walk up from genus to family
while(parent_token_index >= 0):
# "None" is our leaf node marker, we should never have ''
if child_taxon is not None:
assert len(child_taxon) > 0
parent_taxon = tokens[parent_token_index]
# Don't create entries for blank taxa
if (len(parent_taxon) > 0):
create_child = True
# This is the lowest-level taxon in this entry
if (child_taxon is None):
# ...but we don't want to remove existing children from any parents
if (parent_taxon in allowed_parent_taxon_to_child_taxa) and \
(len(allowed_parent_taxon_to_child_taxa[parent_taxon]) > 0):
if verbose:
existing_children_string = str(allowed_parent_taxon_to_child_taxa[parent_taxon])
print('Not creating empty child for parent {} (already has children {})'.format(
parent_taxon,existing_children_string))
create_child = False
# If we're adding a new child entry, clear out any leaf node markers
else:
if (parent_taxon in allowed_parent_taxon_to_child_taxa) and \
(None in allowed_parent_taxon_to_child_taxa[parent_taxon]):
assert len(allowed_parent_taxon_to_child_taxa[parent_taxon]) == 1, \
'Illlegal parent/child configuration'
if verbose:
print('Un-marking parent {} as a leaf node because of child {}'.format(
parent_taxon,child_taxon))
allowed_parent_taxon_to_child_taxa[parent_taxon] = set()
if create_child:
allowed_parent_taxon_to_child_taxa[parent_taxon].add(child_taxon)
# If we haven't hit a non-empty taxon yet, don't update "child_taxon"
assert len(parent_taxon) > 0
child_taxon = parent_taxon
# ...if we have a non-empty taxon
parent_token_index -= 1
# ...for each taxonomic level
# ...for each allowed latin name
allowed_parent_taxon_to_child_taxa = \
sort_dictionary_by_key(allowed_parent_taxon_to_child_taxa)
for parent_taxon in allowed_parent_taxon_to_child_taxa:
# "None" should only ever appear alone; this marks a leaf node with no children
if None in allowed_parent_taxon_to_child_taxa[parent_taxon]:
assert len(allowed_parent_taxon_to_child_taxa[parent_taxon]) == 1, \
'"None" should only appear alone in a child taxon list'
##%% If we were just validating the custom taxa file, we're done
if input_file is None:
print('Finished validating custom taxonomy list')
return
##%% Map all predictions that exist in this dataset...
# ...to the prediction we should generate.
with open(input_file,'r') as f:
input_data = json.load(f)
input_category_id_to_common_name = input_data['classification_categories'] #noqa
input_category_id_to_taxonomy_string = \
input_data['classification_category_descriptions']
input_category_id_to_output_taxon_string = {}
# input_category_id = next(iter(input_category_id_to_taxonomy_string.keys()))
for input_category_id in input_category_id_to_taxonomy_string.keys():
input_taxon_string = input_category_id_to_taxonomy_string[input_category_id]
input_taxon_tokens = input_taxon_string.split(';')
assert len(input_taxon_tokens) == 7, \
'Illegal taxonomy string: {}'.format(input_taxon_string)
# Don't mess with blank/no-cv-result/human (or "animal", which is really "unknown")
if (not is_taxonomic_prediction_string(input_taxon_string)) or \
(input_taxon_string == human_prediction_string):
if verbose:
print('Not messing with non-taxonomic category {}'.format(input_taxon_string))
input_category_id_to_output_taxon_string[input_category_id] = \
input_taxon_string
continue
# Don't mess with protected categories
common_name = input_taxon_tokens[-1]
if (protected_common_names is not None) and \
(common_name in protected_common_names):
if verbose:
print('Not messing with protected category {}:\n{}'.format(
common_name,input_taxon_string))
input_category_id_to_output_taxon_string[input_category_id] = \
input_taxon_string
continue
# Remove GUID and common mame
# This is always class/order/family/genus/species
input_taxon_tokens = input_taxon_tokens[1:-1]
assert len(input_taxon_tokens) == 5
# Start at the species level (the last element in input_taxon_tokens),
# and see whether each taxon is allowed
test_index = len(input_taxon_tokens) - 1
target_taxon = None
while((test_index >= 0) and (target_taxon is None)):
# Species are represented as binomial names, i.e. when test_index is 4,
# test_taxon_name will have two tokens (e.g. "canis lupus"), otherwise
# test_taxon_name will have one token (e.g. "canis", or "aves")
if (test_index == (len(input_taxon_tokens) - 1)) and \
(len(input_taxon_tokens[-1]) > 0):
test_taxon_name = \
input_taxon_tokens[-2] + ' ' + input_taxon_tokens[-1]
else:
test_taxon_name = input_taxon_tokens[test_index]
# If we haven't yet found the level at which this taxon is non-empty,
# keep going up
if len(test_taxon_name) == 0:
test_index -= 1
continue
assert test_taxon_name in speciesnet_latin_name_to_taxon_string, \
'{} not found in taxonomy table'.format(test_taxon_name)
# Is this taxon allowed according to the custom species list?
if test_taxon_name in allowed_parent_taxon_to_child_taxa:
allowed_child_taxa = allowed_parent_taxon_to_child_taxa[test_taxon_name]
assert allowed_child_taxa is not None, \
'allowed_child_taxa should not be None: {}'.format(test_taxon_name)
# If this is the lowest-level allowable token or there is not a
# unique child, don't walk any further, even if walking down
# is enabled.
if None in allowed_child_taxa:
assert len(allowed_child_taxa) == 1, \
'"None" should not be listed as a child taxa with other child taxa'
if (None in allowed_child_taxa) or (len(allowed_child_taxa) > 1):
target_taxon = test_taxon_name
elif not allow_walk_down:
target_taxon = test_taxon_name
else:
candidate_taxon = None
# If there's a unique child, walk back *down* the allowable
# taxa until we run out of unique children
while ((next(iter(allowed_child_taxa)) is not None) and \
(len(allowed_child_taxa) == 1)):
candidate_taxon = next(iter(allowed_child_taxa))
assert candidate_taxon in allowed_parent_taxon_to_child_taxa, \
'{} should be a subset of {}'.format(
candidate_taxon,allowed_parent_taxon_to_child_taxa)
assert candidate_taxon in speciesnet_latin_name_to_taxon_string, \
'{} should be a subset of {}'.format(
candidate_taxon,speciesnet_latin_name_to_taxon_string)
allowed_child_taxa = \
allowed_parent_taxon_to_child_taxa[candidate_taxon]
assert candidate_taxon is not None
target_taxon = candidate_taxon
# ...if this is an allowed taxon
test_index -= 1
# ...for each token
if target_taxon is None:
output_taxon_string = animal_prediction_string
else:
output_taxon_string = speciesnet_latin_name_to_taxon_string[target_taxon]
input_category_id_to_output_taxon_string[input_category_id] = output_taxon_string
# ...for each category (mapping input category IDs to output taxon strings)
if verbose:
for input_category_id in input_category_id_to_output_taxon_string:
common_name = input_category_id_to_common_name[input_category_id]
output_taxon_string = input_category_id_to_output_taxon_string[input_category_id]
print('Mapping {} ({}) to\n{}\n'.format(
input_category_id,
common_name,
output_taxon_string))
##%% Map input category IDs to output category IDs
speciesnet_taxon_string_to_latin_name = \
invert_dictionary(speciesnet_latin_name_to_taxon_string)
input_category_id_to_output_category_id = {}
output_taxon_string_to_category_id = {}
output_category_id_to_common_name = {}
substitutions_printed = set()
for input_category_id in input_category_id_to_output_taxon_string:
output_taxon_string = \
input_category_id_to_output_taxon_string[input_category_id]
output_common_name = output_taxon_string.split(';')[-1]
# Possibly substitute a custom common name
if output_taxon_string in speciesnet_taxon_string_to_latin_name:
speciesnet_latin_name = speciesnet_taxon_string_to_latin_name[output_taxon_string]
if speciesnet_latin_name in speciesnet_latin_name_to_output_common_name:
custom_common_name = speciesnet_latin_name_to_output_common_name[speciesnet_latin_name]
if custom_common_name != output_common_name:
if verbose:
substitution_name = custom_common_name + ' --> ' + output_common_name
if substitution_name not in substitutions_printed:
substitutions_printed.add(substitution_name)
print('Substituting common name {} for {}'.format(
custom_common_name,output_common_name))
output_common_name = custom_common_name
# Do we need to create a new output category?
if output_taxon_string not in output_taxon_string_to_category_id:
output_category_id = str(len(output_taxon_string_to_category_id))
output_taxon_string_to_category_id[output_taxon_string] = \
output_category_id
output_category_id_to_common_name[output_category_id] = \
output_common_name
else:
output_category_id = \
output_taxon_string_to_category_id[output_taxon_string]
input_category_id_to_output_category_id[input_category_id] = \
output_category_id
# Sometimes-useful debug printouts
if False:
original_common_name = \
input_category_id_to_common_name[input_category_id]
original_taxon_string = \
input_category_id_to_taxonomy_string[input_category_id]
print('Mapping {} ({}) to:\n{} ({})\n'.format(
original_common_name,original_taxon_string,
output_common_name,output_taxon_string))
# ...for each category (mapping input category IDs to output category IDs)
##%% Remap all category labels
assert len(set(output_taxon_string_to_category_id.keys())) == \
len(set(output_taxon_string_to_category_id.values())), \
'Category ID/value non-uniqueness error'
output_category_id_to_taxon_string = \
invert_dictionary(output_taxon_string_to_category_id)
with open(input_file,'r') as f:
output_data = json.load(f)
classification_descriptions = None
if 'classification_category_descriptions' in output_data:
classification_descriptions = output_data['classification_category_descriptions']
for im in tqdm(output_data['images']):
if 'detections' not in im or im['detections'] is None:
continue
description_options = ClassificationSmoothingOptions()
if classification_threshold is not None:
description_options.classification_confidence_threshold = classification_threshold
# Possibly prepare a pre-filtering description
pre_filtering_description = None
if classification_descriptions is not None and add_pre_filtering_description:
category_to_count = \
count_detections_by_classification_category(im['detections'],
options=description_options)
pre_filtering_description = \
get_classification_description_string(category_to_count,classification_descriptions)
im['pre_filtering_description'] = pre_filtering_description
for det in im['detections']:
if 'classifications' in det:
for classification in det['classifications']:
classification[0] = \
input_category_id_to_output_category_id[classification[0]]
if classification_descriptions is not None and add_post_filtering_description:
category_to_count = \
count_detections_by_classification_category(im['detections'],
options=description_options)
post_filtering_description = \
get_classification_description_string(category_to_count,output_category_id_to_taxon_string)
im['post_filtering_description'] = post_filtering_description
# ...for each image
output_data['classification_categories'] = output_category_id_to_common_name
output_data['classification_category_descriptions'] = \
output_category_id_to_taxon_string
##%% Write output
write_json(output_file,output_data)
if combine_redundant_categories:
_ = combine_redundant_classification_categories(input_file=output_file,
output_file=output_file)
# ...def restrict_to_taxa_list(...)
[docs]
def merge_classification_categories(target_file,
source_file,
output_file=None):
"""
Modify the classification categories in [source file] to be compatible
with the categories in [target_file], optionally writing a new file.
Uses only category names, does not use category descriptions to decide
whether to merge categories. Behavior is undefined if multiple source
categories have the same name. Does not look at detection categories at
all. If neither file has classification categories, just re-writes
the source file. Errors if exactly one file has classification categories.
Args:
target_file (str or dict): target .json file, in MD format (or an
already-loaded dict)
source_file (str): .json file to modify, in MD format (or an
already-loaded dict)
output_file (str, optional): .json file to which we should write a
modified version of [source_file]
Returns:
dict: remapped MD-formatted dict
"""
##%% Read input files
if isinstance(target_file,dict):
target_d = copy.deepcopy(target_file)
else:
assert os.path.isfile(target_file), \
'Could not find target file {}'.format(target_file)
with open(target_file,'r') as f:
target_d = json.load(f)
if isinstance(source_file,dict):
source_d = copy.deepcopy(source_file)
else:
assert os.path.isfile(source_file), \
'Could not find source file {}'.format(source_file)
with open (source_file,'r') as f:
source_d = json.load(f)
##%% No-op is neither file has classification categories
source_has_classifications = ('classification_categories' in source_d)
target_has_classifications = ('classification_categories' in target_d)
if not (source_has_classifications or target_has_classifications):
print('Neither source nor target has classification categories, bypassing category merge')
if output_file is not None:
write_json(output_file,source_d)
return source_d
##%% Error if only one file has classification categories
if source_has_classifications != target_has_classifications:
raise ValueError('Source and target disagree on whether classifications are present')
##%% Map categories
target_category_id_to_name = {}
target_category_name_to_id = {}
input_category_id_to_output_category_id = {}
if 'classification_categories' in target_d:
target_category_id_to_name = target_d['classification_categories']
target_category_name_to_id = invert_dictionary(target_category_id_to_name)
if 'classification_categories' not in source_d:
print('Warning: merge_classification_categories called with no source classifications.')
print('This is legal, but this is a no-op.')
else:
for source_category_id in source_d['classification_categories']:
assert source_category_id not in input_category_id_to_output_category_id
category_name = source_d['classification_categories'][source_category_id]
# Does the target file have a category with the same name?
if category_name in target_category_name_to_id:
target_category_id = target_category_name_to_id[category_name]
input_category_id_to_output_category_id[source_category_id] = \
target_category_id
print('Mapping category ID for existing category {} ({} to {})'.format(
category_name,source_category_id,target_category_id))
# Print a warning if the descriptions don't match
if 'classification_category_descriptions' in target_d and \
'classification_category_descriptions' in source_d:
target_description = \
target_d['classification_category_descriptions'][target_category_id]
source_description = \
source_d['classification_category_descriptions'][source_category_id]
# If both descriptions look like SpeciesNet taxon strings, omit the GUID from
# the descriptions for comparison
if len(target_description.split(';')) == 7 and \
len(source_description.split(';')) == 7:
target_description = ';'.join((target_description.split(';'))[1:])
source_description = ';'.join((source_description.split(';'))[1:])
if target_description != source_description:
print('Warning: merging categories for {} with different descriptions:'.format(
category_name))
print('Source {}: {}'.format(source_category_id,source_description))
print('Target {}: {}'.format(target_category_id,target_description))
else:
# Create a new category
target_category_ids = list(target_category_id_to_name.keys())
if len(target_category_ids) == 0:
new_category_id = '0'
else:
target_category_ids = [int(x) for x in target_category_ids]
new_category_id = str(max(target_category_ids)+1)
print('Creating a new category for {} ({})'.format(category_name,new_category_id))
target_category_id_to_name[new_category_id] = category_name
target_category_name_to_id = invert_dictionary(target_category_id_to_name)
input_category_id_to_output_category_id[source_category_id] = new_category_id
# Add the description only if the target file has descriptions
if 'classification_category_descriptions' in target_d and \
'classification_category_descriptions' in source_d:
target_d['classification_category_descriptions'][new_category_id] = \
source_d['classification_category_descriptions'][source_category_id]
# ...if we do/don't have a matching target category name
# ...for each source category
# ...if the source file has classifications
##%% Modify images
for im in source_d['images']:
if 'detections' not in im or im['detections'] is None:
continue
for det in im['detections']:
if 'classifications' not in det:
continue
for classification in det['classifications']:
original_category = classification[0]
new_category = input_category_id_to_output_category_id[original_category]
classification[0] = new_category
# ...for each classification
# ...for each detection
# ...for each image
##%% Copy output categories
source_d['classification_categories'] = target_d['classification_categories']
if 'classification_category_descriptions' in target_d:
source_d['classification_category_descriptions'] = \
target_d['classification_category_descriptions']
else:
if 'classification_category_descriptions' in source_d:
del source_d['classification_category_descriptions']
##%% Validate output
validation_options = ValidateBatchResultsOptions()
validation_options.check_image_existence = False
validation_options.raise_errors = True
_ = validate_batch_results(json_filename=source_d,options=validation_options)
##%% Write output if necessary
if output_file is not None:
write_json(output_file,source_d)
return source_d
# ...def merge_classification_categories(...)
[docs]
def combine_redundant_classification_categories(input_file,
output_file=None,
classification_threshold=0.5):
"""
If [input_file] contains multiple categories with the same category name,
merges each equivalence class into a single category, optionally writing the results
to [output_file].
Args:
input_file (str or dict): .json file to read, in MD format (or an
already-loaded dict)
output_file (str): .json file to write, in MD format
classification_threshold (float, optional): only used when sorting
descriptions by count
Returns:
dict: remapped MD-formatted dict
"""
##%% Read input file and list categories
if isinstance(input_file,dict):
d = input_file
else:
assert os.path.isfile(input_file), \
'Input file {} not found'.format(input_file)
with open(input_file,'r') as f:
d = json.load(f)
input_category_name_to_ids = defaultdict(list)
for category_id in d['classification_categories']:
category_name = d['classification_categories'][category_id]
input_category_name_to_ids[category_name].append(category_id)
##%% Return early if there are no redundant categories
# What's the largest number of IDs associated with a single category name?
max_count = 0
for category_name in input_category_name_to_ids:
c = len(input_category_name_to_ids[category_name])
if c > max_count:
max_count = c
if max_count == 1:
if output_file is not None:
print('No redundant categories, writing data unmodified to {}'.format(
output_file))
write_json(output_file,d)
return d
##%% Map input category IDs to output category IDs
input_category_id_to_output_category_id = {}
for i_category,category_name in enumerate(input_category_name_to_ids):
output_category_id = str(i_category)
for input_category_id in input_category_name_to_ids[category_name]:
input_category_id_to_output_category_id[input_category_id] = \
output_category_id
n_input_categories = len(d['classification_categories'])
n_output_categories = len(input_category_name_to_ids)
assert n_output_categories < n_input_categories
print('Removing {} redundant categories'.format(
n_input_categories - n_output_categories))
##%% Create a new category dict
output_category_name_to_id = {}
for input_category_id in input_category_id_to_output_category_id:
category_name = d['classification_categories'][input_category_id]
output_category_id = input_category_id_to_output_category_id[input_category_id]
if category_name in output_category_name_to_id:
assert output_category_name_to_id[category_name] == output_category_id
else:
output_category_name_to_id[category_name] = output_category_id
##%% Create new classification category descriptions
if 'classification_category_descriptions' in d:
assert len(d['classification_category_descriptions']) == \
len(d['classification_categories'])
# Sort descriptions by count overall, so we can sort by description within categories later
description_to_count = {}
for description in d['classification_category_descriptions'].values():
description_to_count[description] = 0
for im in d['images']:
if 'detections' not in im or im['detections'] is None:
continue
for det in im['detections']:
if 'classifications' not in det or det['classifications'] is None:
continue
conf = det['classifications'][0][1]
if conf < classification_threshold:
continue
input_category_id = det['classifications'][0][0]
input_category_description = d['classification_category_descriptions'][input_category_id]
description_to_count[input_category_description] += 1
# ...for each detection
# ...for each image
# Sorting here is just a debug convenience
description_to_count = sort_dictionary_by_value(description_to_count,
reverse=True)
# Create descriptions for the output categories
output_category_id_to_descriptions = defaultdict(list)
for input_category_id in input_category_id_to_output_category_id:
output_category_id = input_category_id_to_output_category_id[input_category_id]
description = d['classification_category_descriptions'][input_category_id]
output_category_id_to_descriptions[output_category_id].append(description)
output_classification_category_descriptions = {}
for output_category_id in output_category_id_to_descriptions:
descriptions = output_category_id_to_descriptions[output_category_id]
# If this output category combines multiple input categories
if len(descriptions) > 1:
# Sort "descriptions" in descending order by the corresponding values
# in description_to_count
descriptions.sort(key=lambda x: description_to_count[x], reverse=True)
output_classification_category_descriptions[output_category_id] = \
'|'.join(descriptions)
# ...for each category
d['classification_category_descriptions'] = output_classification_category_descriptions
# ...if we have to manage descriptions
d['classification_categories'] = invert_dictionary(output_category_name_to_id)
# Remap classifications
for im in d['images']:
if 'detections' not in im or im['detections'] is None:
continue
for det in im['detections']:
if 'classifications' not in det or det['classifications'] is None:
continue
for i_class in range(0,len(det['classifications'])):
input_category_id = det['classifications'][i_class][0]
output_category_id = \
input_category_id_to_output_category_id[input_category_id]
det['classifications'][i_class][0] = output_category_id
# ...for each classification
# ...for each detection
# ...for each image
if output_file is not None:
write_json(output_file,d)
return d
# ...def combine_redundant_classification_categories(...)