Source code for megadetector.postprocessing.create_crop_folder

"""

create_crop_folder.py

Given a MegaDetector .json file and a folder of images, creates a new folder
of images representing all above-threshold crops from the original folder.

"""

#%% Constants and imports

import os
import json
import argparse

from tqdm import tqdm

from multiprocessing.pool import Pool, ThreadPool
from collections import defaultdict
from functools import partial

from megadetector.utils.path_utils import insert_before_extension
from megadetector.utils.ct_utils import invert_dictionary
from megadetector.utils.ct_utils import is_list_sorted
from megadetector.visualization.visualization_utils import crop_image
from megadetector.visualization.visualization_utils import exif_preserving_save


#%% Support classes

[docs] class CreateCropFolderOptions: """ Options used to parameterize create_crop_folder(). """ def __init__(self): #: Confidence threshold determining which detections get written self.confidence_threshold = 0.1 #: Number of pixels to expand each crop self.expansion = 0 #: JPEG quality to use for saving crops (None for default) self.quality = 95 #: Whether to overwrite existing images self.overwrite = True #: Number of concurrent workers self.n_workers = 8 #: Whether to use processes ('process') or threads ('thread') for parallelization self.pool_type = 'thread' #: Include only these categories, or None to include all #: #: options.category_names_to_include = ['animal'] self.category_names_to_include = None
#%% Support functions def _get_crop_filename(image_fn,crop_id): """ Generate crop filenames in a consistent way. """ if isinstance(crop_id,int): crop_id = str(crop_id).zfill(3) assert isinstance(crop_id,str) return insert_before_extension(image_fn,'crop_' + crop_id) def _generate_crops_for_single_image(crops_this_image, input_folder, output_folder, options): """ Generate all the crops required for a single image. Args: crops_this_image (list of dict): list of dicts with at least keys 'image_fn_relative', 'crop_id' input_folder (str): input folder (whole images) output_folder (crops): output folder (crops) options (CreateCropFolderOptions): cropping options """ if len(crops_this_image) == 0: return image_fn_relative = crops_this_image[0]['image_fn_relative'] input_fn_abs = os.path.join(input_folder,image_fn_relative) assert os.path.isfile(input_fn_abs) detections_to_crop = [c['detection'] for c in crops_this_image] # We use a confidence threshold of 0 here, because below-threshold # detections don't result in calls to _generate_crops_for_single_image() cropped_images = crop_image(detections_to_crop, input_fn_abs, confidence_threshold=0, expansion=options.expansion) assert len(cropped_images) == len(crops_this_image) # i_crop = 0; crop_info = crops_this_image[0] for i_crop,crop_info in enumerate(crops_this_image): assert crop_info['image_fn_relative'] == image_fn_relative crop_filename_relative = _get_crop_filename(image_fn_relative, crop_info['crop_id']) crop_filename_abs = os.path.join(output_folder,crop_filename_relative).replace('\\','/') if os.path.isfile(crop_filename_abs) and not options.overwrite: continue cropped_image = cropped_images[i_crop] os.makedirs(os.path.dirname(crop_filename_abs),exist_ok=True) exif_preserving_save(cropped_image,crop_filename_abs,quality=options.quality) # ...for each crop #%% Main function
[docs] def crop_results_to_image_results(image_results_file_with_crop_ids, crop_results_file, output_file, delete_crop_information=True, require_identical_detection_categories=True, restrict_to_top_n=-1, crop_results_prefix=None, detections_without_classification_handling='error'): """ This function is intended to be run after you have: 1. Run MegaDetector on a folder 2. Generated a crop folder using create_crop_folder 3. Run a species classifier on those crops This function will take the crop-level results and transform them back to the original images. Classification categories, if available, are taken from [crop_results_file]. Args: image_results_file_with_crop_ids (str): results file for the original images, containing crop IDs, likely generated via create_crop_folder. All non-standard fields in this file will be passed along to [output_file]. crop_results_file (str): results file for the crop folder output_file (str): output .json file, containing crop-level classifications mapped back to the image level. delete_crop_information (bool, optional): whether to delete the "crop_id" and "crop_filename_relative" fields from each detection, if present. require_identical_detection_categories (bool, optional): if True, error if the image-level and crop-level detection categories are different. If False, ignore the crop-level detection categories. restrict_to_top_n (int, optional): If >0, removes all but the top N classification results for each detection. crop_results_prefix (str, optional): if not None, removes this prefix from crop results filenames. Intended to support the case where the crop results use absolute paths. detections_without_classification_handling (str, optional): what to do when we encounter a crop that doesn't appear in classification results: 'error', or 'include' ("include" means "leave the detection alone, without classifications" """ ##%% Validate inputs assert os.path.isfile(image_results_file_with_crop_ids), \ 'Could not find image-level input file {}'.format(image_results_file_with_crop_ids) assert os.path.isfile(crop_results_file), \ 'Could not find crop results file {}'.format(crop_results_file) output_dir = os.path.dirname(output_file) if len(output_dir) > 0: os.makedirs(output_dir,exist_ok=True) ##%% Read input files print('Reading input...') with open(image_results_file_with_crop_ids,'r') as f: image_results_with_crop_ids = json.load(f) with open(crop_results_file,'r') as f: crop_results = json.load(f) # Find all the detection categories that need to be consistent used_detection_category_ids = set() for im in tqdm(image_results_with_crop_ids['images']): if 'detections' not in im or im['detections'] is None: continue for det in im['detections']: if 'crop_id' in det: used_detection_category_ids.add(det['category']) # Make sure the detection categories that matter are consistent across the two files if require_identical_detection_categories: for category_id in used_detection_category_ids: category_name = image_results_with_crop_ids['detection_categories'][category_id] assert category_id in crop_results['detection_categories'] and \ category_name == crop_results['detection_categories'][category_id], \ 'Crop results and detection results use incompatible categories' crop_filename_to_results = {} # im = crop_results['images'][0] for im in crop_results['images']: fn = im['file'] # Possibly remove a prefix from each filename if (crop_results_prefix is not None) and (crop_results_prefix in fn): if fn.startswith(crop_results_prefix): fn = fn.replace(crop_results_prefix,'',1) im['file'] = fn crop_filename_to_results[fn] = im if 'classification_categories' in crop_results: image_results_with_crop_ids['classification_categories'] = \ crop_results['classification_categories'] if 'classification_category_descriptions' in crop_results: image_results_with_crop_ids['classification_category_descriptions'] = \ crop_results['classification_category_descriptions'] ##%% Read classifications from crop results, merge into image-level results print('Reading classification results...') n_skipped_detections = 0 # Loop over the original image-level detections # # im = image_results_with_crop_ids['images'][0] for i_image,im in tqdm(enumerate(image_results_with_crop_ids['images']), total=len(image_results_with_crop_ids['images'])): if 'detections' not in im or im['detections'] is None: continue # i_det = 0; det = im['detections'][i_det] for det in im['detections']: if 'classifications' in det: del det['classifications'] if 'crop_id' in det: # We may be skipping detections with no classification results skip_detection = False # Find the corresponding crop in the classification results crop_filename_relative = det['crop_filename_relative'] if crop_filename_relative not in crop_filename_to_results: if detections_without_classification_handling == 'error': raise ValueError('Crop lookup error: {}'.format(crop_filename_relative)) elif detections_without_classification_handling == 'include': # Leave this detection unclassified skip_detection = True else: raise ValueError( 'Illegal value for detections_without_classification_handling: {}'.format( detections_without_classification_handling )) if skip_detection: n_skipped_detections += 1 else: crop_results_this_detection = crop_filename_to_results[crop_filename_relative] # Consistency checking assert crop_results_this_detection['file'] == crop_filename_relative, \ 'Crop filename mismatch' assert len(crop_results_this_detection['detections']) == 1, \ 'Multiple crop results for a single detection' assert crop_results_this_detection['detections'][0]['bbox'] == [0,0,1,1], \ 'Invalid crop bounding box' # This check was helpful for the case where crop-level results had already # taken detection confidence values from detector output by construct, but this isn't # really meaningful for most cases. # assert abs(crop_results_this_detection['detections'][0]['conf'] - det['conf']) < 0.01 if require_identical_detection_categories: assert crop_results_this_detection['detections'][0]['category'] == det['category'] # Copy the crop-level classifications det['classifications'] = crop_results_this_detection['detections'][0]['classifications'] confidence_values = [x[1] for x in det['classifications']] assert is_list_sorted(confidence_values,reverse=True) if restrict_to_top_n > 0: det['classifications'] = det['classifications'][0:restrict_to_top_n] if delete_crop_information: if 'crop_id' in det: del det['crop_id'] if 'crop_filename_relative' in det: del det['crop_filename_relative'] # ...for each detection # ...for each image if n_skipped_detections > 0: print('Skipped {} detections'.format(n_skipped_detections)) ##%% Write output file print('Writing output file...') with open(output_file,'w') as f: json.dump(image_results_with_crop_ids,f,indent=1)
# ...def crop_results_to_image_results(...)
[docs] def create_crop_folder(input_file, input_folder, output_folder, output_file=None, crops_output_file=None, options=None): """ Given a MegaDetector .json file and a folder of images, creates a new folder of images representing all above-threshold crops from the original folder. Optionally writes a new .json file that attaches unique IDs to each detection. Args: input_file (str): MD-formatted .json file to process input_folder (str): Input image folder output_folder (str): Output (cropped) image folder output_file (str, optional): new .json file that attaches unique IDs to each detection. crops_output_file (str, optional): new .json file that includes whole-image detections for each of the crops, using confidence values from the original results options (CreateCropFolderOptions, optional): crop parameters """ ## Validate options, prepare output folders if options is None: options = CreateCropFolderOptions() assert os.path.isfile(input_file), 'Input file {} not found'.format(input_file) assert os.path.isdir(input_folder), 'Input folder {} not found'.format(input_folder) os.makedirs(output_folder,exist_ok=True) if output_file is not None: output_dir = os.path.dirname(output_file) if len(output_dir) > 0: os.makedirs(output_dir,exist_ok=True) ##%% Read input print('Reading MD results file...') with open(input_file,'r') as f: detection_results = json.load(f) category_ids_to_include = None if options.category_names_to_include is not None: category_id_to_name = detection_results['detection_categories'] category_name_to_id = invert_dictionary(category_id_to_name) category_ids_to_include = set() for category_name in options.category_names_to_include: assert category_name in category_name_to_id, \ 'Unrecognized category name {}'.format(category_name) category_ids_to_include.add(category_name_to_id[category_name]) ##%% Make a list of crops that we need to create # Maps input images to list of dicts, with keys 'crop_id','detection' image_fn_relative_to_crops = defaultdict(list) n_crops = 0 n_detections_excluded_by_category = 0 # im = detection_results['images'][0] for i_image,im in enumerate(detection_results['images']): if 'detections' not in im or im['detections'] is None or len(im['detections']) == 0: continue detections_this_image = im['detections'] image_fn_relative = im['file'] for i_detection,det in enumerate(detections_this_image): if det['conf'] < options.confidence_threshold: continue if (category_ids_to_include is not None) and \ (det['category'] not in category_ids_to_include): n_detections_excluded_by_category += 1 continue # Use existing crop IDs if they're available (in rare scenarios, we # prepopulate the "crop_id" field in a .json file). if 'crop_id' not in det: det['crop_id'] = i_detection crop_info = {'image_fn_relative':image_fn_relative, 'crop_id':det['crop_id'], 'detection':det} crop_filename_relative = _get_crop_filename(image_fn_relative, crop_info['crop_id']) det['crop_filename_relative'] = crop_filename_relative image_fn_relative_to_crops[image_fn_relative].append(crop_info) n_crops += 1 # ...for each input image print('Prepared a list of {} crops from {} of {} input images'.format( n_crops,len(image_fn_relative_to_crops),len(detection_results['images']))) if n_detections_excluded_by_category > 0: print('Excluded {} detections by category'.format(n_detections_excluded_by_category)) ##%% Generate crops if options.n_workers <= 1: # image_fn_relative = next(iter(image_fn_relative_to_crops)) for image_fn_relative in tqdm(image_fn_relative_to_crops.keys()): crops_this_image = image_fn_relative_to_crops[image_fn_relative] _generate_crops_for_single_image(crops_this_image=crops_this_image, input_folder=input_folder, output_folder=output_folder, options=options) else: print('Creating a {} pool with {} workers'.format(options.pool_type,options.n_workers)) pool = None try: if options.pool_type == 'thread': pool = ThreadPool(options.n_workers) else: assert options.pool_type == 'process' pool = Pool(options.n_workers) # Each element in this list is the list of crops for a single image crop_lists = list(image_fn_relative_to_crops.values()) with tqdm(total=len(image_fn_relative_to_crops)) as pbar: for i,_ in enumerate(pool.imap_unordered(partial( _generate_crops_for_single_image, input_folder=input_folder, output_folder=output_folder, options=options), crop_lists)): pbar.update() finally: if pool is not None: pool.close() pool.join() print('Pool closed and joined for crop folder creation') # ...if we're using parallel processing ##%% Write output file if output_file is not None: with open(output_file,'w') as f: json.dump(detection_results,f,indent=1) if crops_output_file is not None: original_images = detection_results['images'] detection_results_cropped = detection_results detection_results_cropped['images'] = [] # im = original_images[0] for im in original_images: if 'detections' not in im or im['detections'] is None or len(im['detections']) == 0: continue detections_this_image = im['detections'] image_fn_relative = im['file'] for i_detection,det in enumerate(detections_this_image): if 'crop_id' in det: im_out = {} im_out['file'] = det['crop_filename_relative'] det_out = {} det_out['category'] = det['category'] det_out['conf'] = det['conf'] det_out['bbox'] = [0, 0, 1, 1] det_out['crop_id'] = det['crop_id'] im_out['detections'] = [det_out] detection_results_cropped['images'].append(im_out) # ...if we need to include this crop in the new .json file # ...for each crop # ...for each original image with open(crops_output_file,'w') as f: json.dump(detection_results_cropped,f,indent=1)
# ...if we need to write an output file # ...def create_crop_folder() #%% Command-line driver def main(): """ Command-line interface for creating a crop folder from MegaDetector results. """ parser = argparse.ArgumentParser( description='Create a folder of crops from MegaDetector results' ) parser.add_argument( 'input_file', type=str, help='Path to the MegaDetector .json results file' ) parser.add_argument( 'input_folder', type=str, help='Path to the folder containing the original images' ) parser.add_argument( 'output_folder', type=str, help='Path to the folder where cropped images will be saved' ) parser.add_argument( '--output_file', type=str, default=None, help='Path to save the modified MegaDetector .json file (with crop IDs and filenames)' ) parser.add_argument( '--crops_output_file', type=str, default=None, help='Path to save a new .json file for the crops themselves (with full-image detections for each crop)' ) parser.add_argument( '--confidence_threshold', type=float, default=0.1, help='Confidence threshold for detections to be cropped (default: 0.1)' ) parser.add_argument( '--expansion', type=int, default=0, help='Number of pixels to expand each crop (default: 0)' ) parser.add_argument( '--quality', type=int, default=95, help='JPEG quality for saving crops (default: 95)' ) parser.add_argument( '--overwrite', type=str, default='true', choices=['true', 'false'], help="Overwrite existing crop images (default: 'true')" ) parser.add_argument( '--n_workers', type=int, default=8, help='Number of concurrent workers (default: 8)' ) parser.add_argument( '--pool_type', type=str, default='thread', choices=['thread', 'process'], help="Type of parallelism to use ('thread' or 'process', default: 'thread')" ) parser.add_argument( '--category_names', type=str, default=None, help="Comma-separated list of category names to include " + \ "(e.g., 'animal,person'). If None (default), all categories are included." ) args = parser.parse_args() options = CreateCropFolderOptions() options.confidence_threshold = args.confidence_threshold options.expansion = args.expansion options.quality = args.quality options.overwrite = (args.overwrite.lower() == 'true') options.n_workers = args.n_workers options.pool_type = args.pool_type if args.category_names: options.category_names_to_include = [name.strip() for name in args.category_names.split(',')] else: options.category_names_to_include = None print('Starting crop folder creation...') print('Input MD results: {}'.format(args.input_file)) print('Input image folder: {}'.format(args.input_folder)) print('Output crop folder: {}'.format(args.output_folder)) if args.output_file: print('Modified MD results will be saved to {}'.format(args.output_file)) if args.crops_output_file: print('Crops .json output will be saved to {}'.format(args.crops_output_file)) create_crop_folder( input_file=args.input_file, input_folder=args.input_folder, output_folder=args.output_folder, output_file=args.output_file, crops_output_file=args.crops_output_file, options=options ) if __name__ == '__main__': main()