Source code for megadetector.postprocessing.convert_output_format

"""

convert_output_format.py

Converts between file .json and .csv representations of MD output.  The .csv format is
largely obsolete, don't use it unless you're super-duper sure you need it.

"""

#%% Constants and imports

import argparse
import json
import sys
import os

from tqdm import tqdm
from collections import defaultdict

import pandas as pd

from megadetector.postprocessing.load_api_results import load_api_results_csv
from megadetector.utils.wi_taxonomy_utils import load_md_or_speciesnet_file
from megadetector.data_management.annotations import annotation_constants
from megadetector.utils.ct_utils import get_max_conf
from megadetector.utils.ct_utils import write_json

CONF_DIGITS = 3


#%% Conversion functions

[docs] def convert_json_to_csv(input_path, output_path=None, min_confidence=None, omit_bounding_boxes=False, output_encoding=None, overwrite=True, verbose=False): """ Converts a MD results .json file to a totally non-standard .csv format. If [output_path] is None, will convert x.json to x.csv. Args: input_path (str): the input .json file to convert output_path (str, optional): the output .csv file to generate; if this is None, uses [input_path].csv min_confidence (float, optional): the minimum-confidence detection we should include in the "detections" column; has no impact on the other columns omit_bounding_boxes (bool, optional): whether to leave out the json-formatted bounding boxes that make up the "detections" column, which are not generally useful for someone who wants to consume this data as a .csv file output_encoding (str, optional): encoding to use for the .csv file overwrite (bool, optional): whether to overwrite an existing .csv file; if this is False and the output file exists, no-ops and returns verbose (bool, optional): enable additional debug output """ if output_path is None: output_path = os.path.splitext(input_path)[0]+'.csv' if os.path.isfile(output_path) and (not overwrite): print('File {} exists, skipping json --> csv conversion'.format(output_path)) return print('Loading json results from {}...'.format(input_path)) json_output = load_md_or_speciesnet_file(input_path, verbose=verbose) def clean_category_name(s): return s.replace(',','_').replace(' ','_').lower() # Create column names for max detection confidences detection_category_id_to_max_conf_column_name = {} for category_id in json_output['detection_categories'].keys(): category_name = clean_category_name(json_output['detection_categories'][category_id]) detection_category_id_to_max_conf_column_name[category_id] = \ 'max_conf_' + category_name classification_category_id_to_max_conf_column_name = {} # Create column names for max classification confidences (if necessary) if 'classification_categories' in json_output.keys(): for category_id in json_output['classification_categories'].keys(): category_name = clean_category_name(json_output['classification_categories'][category_id]) classification_category_id_to_max_conf_column_name[category_id] = \ 'max_classification_conf_' + category_name # There are several .json fields for which we add .csv columns; other random bespoke fields # will be ignored. optional_fields = ['width','height','datetime','exif_metadata'] optional_fields_present = set() # Iterate once over the data to check for optional fields print('Looking for optional fields...') for im in tqdm(json_output['images']): # Which optional fields are present for this image? for k in im.keys(): if k in optional_fields: optional_fields_present.add(k) optional_fields_present = sorted(list(optional_fields_present)) if len(optional_fields_present) > 0: print('Found {} optional fields'.format(len(optional_fields_present))) print('Formatting results...') output_records = [] # i_image = 0; im = json_output['images'][i_image] for im in tqdm(json_output['images']): output_record = {} output_records.append(output_record) output_record['image_path'] = im['file'] output_record['max_confidence'] = '' output_record['detections'] = '' for field_name in optional_fields_present: output_record[field_name] = '' if field_name in im: output_record[field_name] = im[field_name] for detection_category_id in detection_category_id_to_max_conf_column_name: column_name = detection_category_id_to_max_conf_column_name[detection_category_id] output_record[column_name] = 0 for classification_category_id in classification_category_id_to_max_conf_column_name: column_name = classification_category_id_to_max_conf_column_name[classification_category_id] output_record[column_name] = 0 if 'failure' in im and im['failure'] is not None: output_record['max_confidence'] = 'failure' output_record['detections'] = im['failure'] # print('Skipping failed image {} ({})'.format(im['file'],im['failure'])) continue max_conf = get_max_conf(im) detection_category_id_to_max_conf = defaultdict(float) classification_category_id_to_max_conf = defaultdict(float) detections = [] # d = im['detections'][0] for d in im['detections']: # Skip sub-threshold detections if (min_confidence is not None) and (d['conf'] < min_confidence): continue input_bbox = d['bbox'] # Our .json format is xmin/ymin/w/h # # Our .csv format was ymin/xmin/ymax/xmax xmin = input_bbox[0] ymin = input_bbox[1] xmax = input_bbox[0] + input_bbox[2] ymax = input_bbox[1] + input_bbox[3] output_detection = [ymin, xmin, ymax, xmax] output_detection.append(d['conf']) output_detection.append(int(d['category'])) detections.append(output_detection) detection_category_id = d['category'] detection_category_max = detection_category_id_to_max_conf[detection_category_id] if d['conf'] > detection_category_max: detection_category_id_to_max_conf[detection_category_id] = d['conf'] if 'classifications' in d: for c in d['classifications']: classification_category_id = c[0] classification_conf = c[1] classification_category_max = \ classification_category_id_to_max_conf[classification_category_id] if classification_conf > classification_category_max: classification_category_id_to_max_conf[classification_category_id] = \ classification_conf # ...for each classification # ...if we have classification results for this detection # ...for each detection detection_string = '' if not omit_bounding_boxes: detection_string = json.dumps(detections) output_record['detections'] = detection_string output_record['max_confidence'] = max_conf for detection_category_id in detection_category_id_to_max_conf_column_name: column_name = detection_category_id_to_max_conf_column_name[detection_category_id] output_record[column_name] = \ detection_category_id_to_max_conf[detection_category_id] for classification_category_id in classification_category_id_to_max_conf_column_name: column_name = classification_category_id_to_max_conf_column_name[classification_category_id] output_record[column_name] = \ classification_category_id_to_max_conf[classification_category_id] # ...for each image print('Writing to csv...') df = pd.DataFrame(output_records) if omit_bounding_boxes: df = df.drop('detections',axis=1) df.to_csv(output_path,index=False,header=True,encoding=output_encoding)
# ...def convert_json_to_csv(...)
[docs] def convert_csv_to_json(input_path,output_path=None,overwrite=True): """ Convert .csv to .json. If output_path is None, will convert x.csv to x.json. This supports a largely obsolete .csv format, there's almost no reason you want to do this. Args: input_path (str): .csv filename to convert to .json output_path (str, optional): the output .json file to generate; if this is None, uses [input_path].json overwrite (bool, optional): whether to overwrite an existing .json file; if this is False and the output file exists, no-ops and returns """ if output_path is None: output_path = os.path.splitext(input_path)[0]+'.json' if os.path.isfile(output_path) and (not overwrite): print('File {} exists, skipping csv --> json conversion'.format(output_path)) return # Format spec: # # https://github.com/agentmorris/MegaDetector/tree/main/megadetector/api/batch_processing print('Loading csv results...') df = load_api_results_csv(input_path) info = { "format_version":"1.2", "detector": "unknown", "detection_completion_time" : "unknown", "classifier": "unknown", "classification_completion_time": "unknown" } classification_categories = {} detection_categories = annotation_constants.detector_bbox_categories images = [] # i_file = 0; row = df.iloc[i_file] for i_file,row in df.iterrows(): image = {} image['file'] = row['image_path'] image['max_detection_conf'] = round(row['max_confidence'], CONF_DIGITS) src_detections = row['detections'] out_detections = [] for i_detection,detection in enumerate(src_detections): # Our .csv format was ymin/xmin/ymax/xmax # # Our .json format is xmin/ymin/w/h ymin = detection[0] xmin = detection[1] ymax = detection[2] xmax = detection[3] bbox = [xmin, ymin, xmax-xmin, ymax-ymin] conf = detection[4] i_class = detection[5] out_detection = {} out_detection['category'] = str(i_class) out_detection['conf'] = conf out_detection['bbox'] = bbox out_detections.append(out_detection) # ...for each detection image['detections'] = out_detections images.append(image) # ...for each image json_out = {} json_out['info'] = info json_out['detection_categories'] = detection_categories json_out['classification_categories'] = classification_categories json_out['images'] = images write_json(output_path,json_out)
# ...def convert_csv_to_json(...) #%% Interactive driver if False: #%% input_path = r'c:\temp\test.json' min_confidence = None output_path = input_path + '.csv' convert_json_to_csv(input_path,output_path,min_confidence=min_confidence, omit_bounding_boxes=False) #%% base_path = r'c:\temp\json' input_paths = os.listdir(base_path) input_paths = [os.path.join(base_path,s) for s in input_paths] min_confidence = None for input_path in input_paths: output_path = input_path + '.csv' convert_json_to_csv(input_path,output_path,min_confidence=min_confidence, omit_bounding_boxes=True) #%% Concatenate .csv files from a folder import glob csv_files = glob.glob(os.path.join(base_path,'*.json.csv' )) master_csv = os.path.join(base_path,'all.csv') print('Concatenating {} files to {}'.format(len(csv_files),master_csv)) header = None with open(master_csv, 'w') as fout: for filename in tqdm(csv_files): with open(filename) as fin: lines = fin.readlines() if header is not None: assert lines[0] == header else: header = lines[0] fout.write(header) for line in lines[1:]: if len(line.strip()) == 0: continue fout.write(line) # ...for each .csv file # with open(master_csv) #%% Command-line driver def main(): """ Command-line driver for convert_output_format(), which converts json <--> csv. """ parser = argparse.ArgumentParser() parser.add_argument('input_path',type=str, help='Input filename ending in .json or .csv') parser.add_argument('--output_path',type=str,default=None, help='Output filename ending in .json or .csv (defaults to ' + \ 'input file, with .json/.csv replaced by .csv/.json)') parser.add_argument('--omit_bounding_boxes',action='store_true', help='Omit bounding box text from .csv output (large and usually not useful)') if len(sys.argv[1:]) == 0: parser.print_help() parser.exit() args = parser.parse_args() if args.output_path is None: if args.input_path.endswith('.csv'): args.output_path = args.input_path[:-4] + '.json' elif args.input_path.endswith('.json'): args.output_path = args.input_path[:-5] + '.csv' else: raise ValueError('Illegal input file extension') if args.input_path.endswith('.csv') and args.output_path.endswith('.json'): assert not args.omit_bounding_boxes, \ '--omit_bounding_boxes does not apply to csv --> json conversion' convert_csv_to_json(args.input_path,args.output_path) elif args.input_path.endswith('.json') and args.output_path.endswith('.csv'): convert_json_to_csv(args.input_path,args.output_path,omit_bounding_boxes=args.omit_bounding_boxes) else: raise ValueError('Illegal format combination') if __name__ == '__main__': main()