"""
zamba_to_md.py
Convert a labels.csv file produced by Zamba Cloud to a MD results file suitable
for import into Timelapse.
Columns are expected to be:
video_uuid (not used)
original_filename (assumed to be a relative path name)
top_k_label,top_k_probability, for k = 1..N
[category name 1],[category name 2],...
corrected_label
Because the MD results file fundamentally stores detections, what we'll
actually do is create bogus detections that fill the entire image.
There is no special handling of empty/blank categories; because these results are
based on a classifier, rather than a detector (where "blank" would be the absence of
all other categories), "blank" can be queried in Timelapse just like any other class.
"""
#%% Imports and constants
import sys
import argparse
import pandas as pd
from megadetector.utils.ct_utils import write_json
#%% Main function
[docs]
def zamba_results_to_md_results(input_file,output_file=None):
"""
Converts the .csv file [input_file] to the MD-formatted .json file [output_file].
If [output_file] is None, '.json' will be appended to the input file.
Args:
input_file (str): the .csv file to convert
output_file (str, optional): the output .json file (defaults to
[input_file].json)
"""
if output_file is None:
output_file = input_file + '.json'
df = pd.read_csv(input_file)
expected_columns = ('video_uuid','corrected_label','original_filename')
for s in expected_columns:
assert s in df.columns,\
'Expected column {} not found, are you sure this is a Zamba results .csv file?'.format(
s)
# How many results are included per file?
assert 'top_1_probability' in df.columns and 'top_1_label' in df.columns
top_k = 2
while(True):
p_string = 'top_' + str(top_k) + '_probability'
label_string = 'top_' + str(top_k) + '_label'
if p_string in df.columns:
assert label_string in df.columns,\
'Oops, {} is a column but {} is not'.format(
p_string,label_string)
top_k += 1
continue
else:
assert label_string not in df.columns,\
'Oops, {} is a column but {} is not'.format(
label_string,p_string)
top_k -= 1
break
print('Found {} probability column pairs'.format(top_k))
# Category names start after the fixed columns and the probability columns
category_names = []
column_names = list(df.columns)
first_category_name_index = 0
while('top_' in column_names[first_category_name_index] or \
column_names[first_category_name_index] in expected_columns):
first_category_name_index += 1
i_column = first_category_name_index
while( (i_column < len(column_names)) and (column_names[i_column] != 'corrected_label') ):
category_names.append(column_names[i_column])
i_column += 1
print('Found {} categories:\n'.format(len(category_names)))
for s in category_names:
print(s)
info = {}
info['format_version'] = '1.3'
info['detector'] = 'Zamba Cloud'
info['classifier'] = 'Zamba Cloud'
detection_category_id_to_name = {}
for category_id,category_name in enumerate(category_names):
detection_category_id_to_name[str(category_id)] = category_name
detection_category_name_to_id = {v: k for k, v in detection_category_id_to_name.items()}
images = []
# i_row = 0; row = df.iloc[i_row]
for i_row,row in df.iterrows():
im = {}
images.append(im)
im['file'] = row['original_filename']
detections = []
# k = 1
for k in range(1,top_k+1):
label = row['top_{}_label'.format(k)]
confidence = row['top_{}_probability'.format(k)]
det = {}
det['category'] = detection_category_name_to_id[label]
det['conf'] = confidence
det['bbox'] = [0,0,1.0,1.0]
detections.append(det)
im['detections'] = detections
# ...for each row
results = {}
results['info'] = info
results['detection_categories'] = detection_category_id_to_name
results['images'] = images
write_json(output_file,results)
# ...zamba_results_to_md_results(...)
#%% Interactive driver
if False:
pass
#%%
input_file = r"G:\temp\labels-job-b95a4b76-e332-4e17-ab40-03469392d36a-2023-11-04_16-28-50.060130.csv"
output_file = None
zamba_results_to_md_results(input_file,output_file)
#%% Command-line driver
def main():
"""
Command-line driver for zamba_to_md
"""
parser = argparse.ArgumentParser(
description='Convert a Zamba-formatted .csv results file to a MD-formatted .json results file')
parser.add_argument(
'input_file',
type=str,
help='input .csv file')
parser.add_argument(
'--output_file',
type=str,
default=None,
help='output .json file (defaults to input file appended with ".json")')
if len(sys.argv[1:]) == 0:
parser.print_help()
parser.exit()
args = parser.parse_args()
zamba_results_to_md_results(args.input_file,args.output_file)
if __name__ == '__main__':
main()