"""
validate_batch_results.py
Given a .json file containing MD results, validate that it's compliant with the format spec:
https://lila.science/megadetector-output-format
"""
#%% Constants and imports
import os
import sys
import json
import argparse
from tqdm import tqdm
from megadetector.detection.video_utils import is_video_file
from megadetector.utils.ct_utils import args_to_object, is_list_sorted # noqa
typical_info_fields = ['detector',
'detection_completion_time',
'classifier',
'classification_completion_time',
'detection_metadata',
'classifier_metadata']
required_keys = ['info',
'images',
'detection_categories']
typical_keys = ['classification_categories',
'classification_category_descriptions']
#%% Classes
[docs]
class ValidateBatchResultsOptions:
"""
Options controlling the behavior of validate_batch_results()
"""
def __init__(self):
#: Should we verify that images exist? If this is True, and the .json
#: file contains relative paths, relative_path_base needs to be specified.
self.check_image_existence = False
#: If check_image_existence is True, where do the images live?
#:
#: If None, assumes absolute paths.
self.relative_path_base = None
#: Should we return the loaded data, or just the validation results?
self.return_data = False
#: Enable additional debug output
self.verbose = False
#: Should we raise errors immediately (vs. just catching and reporting)?
self.raise_errors = False
# ...class ValidateBatchResultsOptions
#%% Main function
[docs]
def validate_batch_results(json_filename,options=None):
"""
Verify that [json_filename] is a valid MD output file. Currently errors on invalid files.
Args:
json_filename (str or dict): the filename to validate, or an already loaded results dict
options (ValidateBatchResultsOptions, optional): all the parameters used to control this
process, see ValidateBatchResultsOptions for details
Returns:
dict: a dict with a field called "validation_results", which is itself a dict. The reason
it's a dict inside a dict is that if return_data is True, the outer dict also contains all
the loaded data. The "validation_results" dict contains fields called "errors", "warnings",
and "filename". "errors" and "warnings" are lists of strings, although "errors" will never
be longer than N=1, since validation fails at the first error.
"""
validation_results = {}
if options is None:
options = ValidateBatchResultsOptions()
if isinstance(json_filename,str):
if options.verbose:
print('Loading results from {}'.format(json_filename))
validation_results['filename'] = json_filename
with open(json_filename,'r') as f:
d = json.load(f)
else:
validation_results['filename'] = None
assert isinstance(json_filename,dict), \
'json_filename must be a filename or dict'
d = json_filename
validation_results['warnings'] = []
validation_results['errors'] = []
if not isinstance(d,dict):
validation_results['errors'].append('Input data is not a dict')
to_return = {}
to_return['validation_results'] = validation_results
return to_return
try:
## Info validation
if 'info' not in d:
raise ValueError('Input does not contain info field')
info = d['info']
if not isinstance(info,dict):
raise ValueError('Input contains invalid info field')
if 'format_version' not in info :
raise ValueError('Input does not specify format version')
format_version = float(info['format_version'])
if format_version < 1.3:
raise ValueError('This validator can only be used with format version 1.3 or later')
# We were ambiguous on string vs float version numbers prior to the 1.6 format,
# but *usually* wrote string-formatted floats. Now we required them to be string-formatted
# floats.
if format_version >= 1.6:
if not isinstance(info['format_version'],str):
raise ValueError('format_version is not a string')
## Category validation
if 'detection_categories' not in d:
raise ValueError('Input does not contain detection_categories field')
for category_id in d['detection_categories'].keys():
# Category ID should be string-formatted ints
if not isinstance(category_id,str):
raise ValueError('Invalid detection category ID: {}'.format(category_id))
_ = int(category_id)
if not isinstance(d['detection_categories'][category_id],str):
raise ValueError('Invalid detection category name: {}'.format(
d['detection_categories'][category_id]))
category_name = d['detection_categories'][category_id]
if format_version >= 1.6:
if len(category_name.strip()) == 0:
raise ValueError('Detection category name is blank for ID {}'.format(
category_id))
# ...for each detection category
if 'classification_categories' in d:
for category_id in d['classification_categories'].keys():
# Categories should be string-formatted ints
if not isinstance(category_id,str):
raise ValueError('Invalid classification category ID: {}'.format(category_id))
_ = int(category_id)
if not isinstance(d['classification_categories'][category_id],str):
raise ValueError('Invalid classification category name: {}'.format(
d['classification_categories'][category_id]))
category_name = d['classification_categories'][category_id]
if format_version >= 1.6:
if len(category_name.strip()) == 0:
raise ValueError('Classification category name is blank for ID {}'.format(
category_id))
# ...for each classification category
## Image validation
if 'images' not in d:
raise ValueError('images field not present')
if not isinstance(d['images'],list):
raise ValueError('Invalid images field')
if options.verbose:
print('Validating images')
# im = d['images'][0]
for i_im,im in tqdm(enumerate(d['images']),total=len(d['images']),disable=(not options.verbose)):
if not isinstance(im,dict):
raise ValueError('Invalid image at index {}'.format(i_im))
if 'file' not in im:
raise ValueError('Image without filename at index {}'.format(i_im))
file = im['file']
if 'detections' in im and im['detections'] is not None:
for det in im['detections']:
assert 'category' in det, 'Image {} has a detection with no category'.format(file)
assert 'conf' in det, 'Image {} has a detection with no confidence'.format(file)
assert isinstance(det['conf'],float), \
'Image {} has an illegal confidence value'.format(file)
assert 'bbox' in det, 'Image {} has a detection with no box'.format(file)
assert det['category'] in d['detection_categories'], \
'Image {} has a detection with an unmapped category {}'.format(
file,det['category'])
if 'classifications' in det and det['classifications'] is not None:
for c in det['classifications']:
assert isinstance(c[0],str), \
'Image {} has an illegal classification category: {}'.format(file,c[0])
assert c[0] in d['classification_categories'], \
'Classification category {} appears in an image, but not in the category list'.format(
c[0])
try:
_ = int(c[0])
except Exception:
raise ValueError('Image {} has an illegal classification category: {}'.format(
file,c[0]))
assert isinstance(c[1],float) or isinstance(c[1], int)
# ...for each detection
# ...if this image has a detections field
if options.check_image_existence:
if options.relative_path_base is None:
file_abs = file
else:
file_abs = os.path.join(options.relative_path_base,file)
if not os.path.isfile(file_abs):
raise ValueError('Cannot find file {}'.format(file_abs))
if 'failure' in im:
if im['failure'] is not None:
if not isinstance(im['failure'],str):
raise ValueError('Image {} has an illegal [failure] value: {}'.format(
im['file'],str(im['failure'])))
if 'detections' not in im:
s = 'Image {} has a failure value, should also have a null detections array'.format(
im['file'])
validation_results['warnings'].append(s)
elif im['detections'] is not None:
raise ValueError('Image {} has a failure value but a non-null detections array'.format(
im['file']))
else:
if not isinstance(im['detections'],list):
raise ValueError('Invalid detections list for image {}'.format(im['file']))
if is_video_file(im['file']) and (format_version >= 1.5):
if 'frames_processed' not in im:
raise ValueError('Video without frames_processed field: {}'.format(im['file']))
if is_video_file(im['file']) and (format_version >= 1.4):
if 'frame_rate' not in im:
raise ValueError('Video without frame rate: {}'.format(im['file']))
if im['frame_rate'] < 0:
if 'failure' not in im:
raise ValueError('Video with illegal frame rate {}: {}'.format(
str(im['frame_rate']),im['file']))
if 'detections' in im and im['detections'] is not None:
for det in im['detections']:
if 'frame_number' not in det:
raise ValueError('Frame without frame number in video {}'.format(
im['file']))
frame_numbers = [det['frame_number'] for det in im['detections']] # noqa
# assert is_list_sorted(frame_numbers)
# ...for each image
## Validation of other keys
for k in d.keys():
if (k not in typical_keys) and (k not in required_keys):
validation_results['warnings'].append(
'Warning: non-standard key {} present at file level'.format(k))
except Exception as e:
if options.raise_errors:
raise
else:
validation_results['errors'].append(str(e))
# ...try/except
if options.return_data:
to_return = d
else:
to_return = {}
to_return['validation_results'] = validation_results
return to_return
# ...def validate_batch_results(...)
#%% Interactive driver(s)
if False:
#%% Validate all .json files in the MD test suite
from megadetector.utils.path_utils import recursive_file_list
filenames = recursive_file_list(os.path.expanduser('~/AppData/Local/Temp/md-tests'))
filenames = [fn for fn in filenames if fn.endswith('.json')]
filenames = [fn for fn in filenames if 'detectionIndex' not in fn]
options = ValidateBatchResultsOptions()
options.check_image_existence = False
options.relative_path_base = None # r'g:\temp\test-videos'
for json_filename in filenames:
results = validate_batch_results(json_filename,options)
if len(results['validation_results']['warnings']) > 0:
print('Warnings in file {}:'.format(json_filename))
for s in results['validation_results']['warnings']:
print(s)
print('')
assert len(results['validation_results']['errors']) == 0
#%% Command-line driver
def main(): # noqa
options = ValidateBatchResultsOptions()
parser = argparse.ArgumentParser()
parser.add_argument(
'json_filename',
help='path to .json file containing MegaDetector results')
parser.add_argument(
'--check_image_existence', action='store_true',
help='check that all images referred to in the results file exist')
parser.add_argument(
'--relative_path_base', default=None,
help='if --check_image_existence is specified and paths are relative, use this as the base folder')
if len(sys.argv[1:]) == 0:
parser.print_help()
parser.exit()
args = parser.parse_args()
args_to_object(args, options)
validate_batch_results(args.json_filename,options)
if __name__ == '__main__':
main()