Source code for megadetector.detection.video_utils

"""

video_utils.py

Utilities for splitting, rendering, and assembling videos.

"""

#%% Constants, imports, environment

import os
import re
import cv2
import glob
import json

from collections import defaultdict
from multiprocessing.pool import ThreadPool
from multiprocessing.pool import Pool
from tqdm import tqdm
from functools import partial
from inspect import signature

from megadetector.utils import path_utils
from megadetector.utils.path_utils import clean_path
from megadetector.utils.ct_utils import sort_list_of_dicts_by_key
from megadetector.visualization import visualization_utils as vis_utils

default_fourcc = 'h264'

video_progress_bar_description = 'Processing video'

#%% Path utilities

VIDEO_EXTENSIONS = ('.mp4','.avi','.mpeg','.mpg','.mov','.mkv','.flv')

[docs] def is_video_file(s,video_extensions=VIDEO_EXTENSIONS): """ Checks a file's extension against a set of known video file extensions to determine whether it's a video file. Performs a case-insensitive comparison. Args: s (str): filename to check for probable video-ness video_extensions (list, optional): list of video file extensions Returns: bool: True if this looks like a video file, else False """ ext = os.path.splitext(s)[1] return ext.lower() in video_extensions
[docs] def find_video_strings(strings): """ Given a list of strings that are potentially video file names, looks for strings that actually look like video file names (based on extension). Args: strings (list): list of strings to check for video-ness Returns: list: a subset of [strings] that looks like they are video filenames """ return [s for s in strings if is_video_file(s.lower())]
[docs] def find_videos(dirname, recursive=False, convert_slashes=True, return_relative_paths=False): """ Finds all files in a directory that look like video file names. Args: dirname (str): folder to search for video files recursive (bool, optional): whether to search [dirname] recursively convert_slashes (bool, optional): forces forward slashes in the returned files, otherwise uses the native path separator return_relative_paths (bool, optional): forces the returned filenames to be relative to [dirname], otherwise returns absolute paths Returns: A list of filenames within [dirname] that appear to be videos """ if recursive: files = glob.glob(os.path.join(dirname, '**', '*.*'), recursive=True) else: files = glob.glob(os.path.join(dirname, '*.*')) files = [fn for fn in files if os.path.isfile(fn)] if return_relative_paths: files = [os.path.relpath(fn,dirname) for fn in files] if convert_slashes: files = [fn.replace('\\', '/') for fn in files] return find_video_strings(files)
#%% Shared function for opening videos DEFAULT_BACKEND = -1 # This is the order in which we'll try to open backends. # # In general, the defaults are as follows, though they vary depending # on what's installed: # # Windows: CAP_DSHOW or CAP_MSMF # Linux: CAP_FFMPEG # macOS: CAP_AVFOUNDATION # # Technically if the default fails, we may try the same backend again, but this # is rare, and it's not worth the complexity of figuring out what the system # default is. backend_id_to_name = { DEFAULT_BACKEND:'default', cv2.CAP_FFMPEG: 'CAP_FFMPEG', cv2.CAP_DSHOW: 'CAP_DSHOW', cv2.CAP_MSMF: 'CAP_MSMF', cv2.CAP_AVFOUNDATION: 'CAP_AVFOUNDATION', cv2.CAP_GSTREAMER: 'CAP_GSTREAMER' }
[docs] def open_video(video_path,verbose=False): """ Open the video at [video_path], trying multiple OpenCV backends if necessary. Args: video_path (str): the file to open verbose (bool, optional): enable additional debug output Returns: (cv2.VideoCapture,image): a tuple containing (a) the open video capture device (or None if no backends succeeded) and (b) the first frame of the video (or None) """ if not os.path.isfile(video_path): print('Video file {} not found'.format(video_path)) return None,None backend_ids = backend_id_to_name.keys() for backend_id in backend_ids: backend_name = backend_id_to_name[backend_id] if verbose: print('Trying backend {}'.format(backend_name)) try: if backend_id == DEFAULT_BACKEND: vidcap = cv2.VideoCapture(video_path) else: vidcap = cv2.VideoCapture(video_path, backend_id) except Exception as e: if verbose: print('Warning: error opening {} with backend {}: {}'.format( video_path,backend_name,str(e))) continue if not vidcap.isOpened(): if verbose: print('Warning: isOpened() is False for {} with backend {}'.format( video_path,backend_name)) try: vidcap.release() except Exception: pass continue success, image = vidcap.read() if success and (image is not None): if verbose: print('Successfully opened {} with backend: {}'.format( video_path,backend_name)) return vidcap,image print('Warning: failed to open {} with backend {}'.format( video_path,backend_name)) try: vidcap.release() except Exception: pass # ...for each backend print('Error: failed to open {} with any backend'.format(video_path)) return None,None
# ...def open_video(...) #%% Functions for rendering frames to video and vice-versa # http://tsaith.github.io/combine-images-into-a-video-with-python-3-and-opencv-3.html
[docs] def frames_to_video(images, fs, output_file_name, codec_spec=default_fourcc): """ Given a list of image files and a sample rate, concatenates those images into a video and writes to a new video file. Args: images (list): a list of frame file names to concatenate into a video fs (float): the frame rate in fps output_file_name (str): the output video file, no checking is performed to make sure the extension is compatible with the codec codec_spec (str, optional): codec to use for encoding; h264 is a sensible default and generally works on Windows, but when this fails (which is around 50% of the time on Linux), mp4v is a good second choice """ if codec_spec is None: codec_spec = 'h264' if len(images) == 0: print('Warning: no frames to render') return output_dir = os.path.dirname(output_file_name) if len(output_dir) > 0: os.makedirs(output_dir, exist_ok=True) # Determine the width and height from the first image frame = cv2.imread(images[0]) # cv2.imshow('video',frame) height, width, channels = frame.shape # Define the codec and create VideoWriter object fourcc = cv2.VideoWriter_fourcc(*codec_spec) out = cv2.VideoWriter(output_file_name, fourcc, fs, (width, height)) for image in images: frame = cv2.imread(image) out.write(frame) out.release() cv2.destroyAllWindows()
[docs] def get_video_fs(input_video_file,verbose=False): """ Retrieves the frame rate of [input_video_file]. Args: input_video_file (str): video file for which we want the frame rate verbose (bool, optional): enable additional debug output Returns: float: the frame rate of [input_video_file], or None if no frame rate could be extracted """ assert os.path.isfile(input_video_file), \ 'File {} not found'.format(input_video_file) vidcap,_ = open_video(input_video_file,verbose=verbose) if vidcap is None: if verbose: print('Failed to get frame rate for {}'.format(input_video_file)) return None fs = vidcap.get(cv2.CAP_PROP_FPS) try: vidcap.release() except Exception as e: print('Warning: error closing video handle for {}: {}'.format( input_video_file,str(e))) return fs
def _frame_number_to_filename(frame_number): """ Ensures that frame images are given consistent filenames. """ return 'frame{:06d}.jpg'.format(frame_number) def _filename_to_frame_number(filename): """ Extract the frame number from a filename that was created using _frame_number_to_filename. Args: filename (str): a filename created with _frame_number_to_filename. Returns: int: the frame number extracted from [filename] """ filename = os.path.basename(filename) match = re.search(r'frame(\d+)\.jpg', filename) if match is None: raise ValueError('{} does not appear to be a frame file'.format(filename)) frame_number = match.group(1) try: frame_number = int(frame_number) except Exception: raise ValueError('Filename {} does not contain a valid frame number'.format(filename)) return frame_number def _add_frame_numbers_to_results(results): """ Given the 'images' list from a set of MD results that was generated on video frames, add a 'frame_number' field to each image, and return the list, sorted by frame number. Also modifies "results" in place. Args: results (list): list of image dicts """ # This indicate that this was a failure for a single video if isinstance(results,dict): assert 'failure' in results return results # Add video-specific fields to the results for im in results: fn = im['file'] frame_number = _filename_to_frame_number(fn) im['frame_number'] = frame_number results = sort_list_of_dicts_by_key(results,'frame_number') return results
[docs] def run_callback_on_frames(input_video_file, frame_callback, every_n_frames=None, verbose=False, frames_to_process=None, allow_empty_videos=False): """ Calls the function frame_callback(np.array,image_id) on all (or selected) frames in [input_video_file]. Args: input_video_file (str): video file to process frame_callback (function): callback to run on frames, should take an np.array and a string and return a single value. callback should expect two arguments: (1) a numpy array with image data, in the typical PIL image orientation/channel order, and (2) a string identifier for the frame, typically something like "frame0006.jpg" (even though it's not a JPEG image, this is just an identifier for the frame). every_n_frames (int or float, optional): sample every Nth frame starting from the first frame; if this is None or 1, every frame is processed. If this is a negative value, it's interpreted as a sampling rate in seconds, which is rounded to the nearest frame sampling rate. Mutually exclusive with frames_to_process. verbose (bool, optional): enable additional debug console output frames_to_process (list of int, optional): process this specific set of frames; mutually exclusive with every_n_frames. If all values are beyond the length of the video, no frames are extracted. Can also be a single int, specifying a single frame number. allow_empty_videos (bool, optional): Just print a warning if a video appears to have no frames (by default, this raises an Exception). Returns: dict: dict with keys 'frame_filenames' (list), 'frame_rate' (float), 'results' (list). 'frame_filenames' are synthetic filenames (e.g. frame000000.jpg). Elements in 'results' are whatever is returned by the callback, typically dicts in the same format used in the 'images' array in the MD results format. [frame_filenames] and [results] both have one element per processed frame. """ assert os.path.isfile(input_video_file), 'File {} not found'.format(input_video_file) if isinstance(frames_to_process,int): frames_to_process = [frames_to_process] if (frames_to_process is not None) and (every_n_frames is not None): raise ValueError('frames_to_process and every_n_frames are mutually exclusive') vidcap = None try: vidcap,image = open_video(input_video_file,verbose=verbose) n_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) frame_rate = vidcap.get(cv2.CAP_PROP_FPS) if verbose: print('Video {} contains {} frames at {} Hz'.format(input_video_file,n_frames,frame_rate)) frame_filenames = [] results = [] if (every_n_frames is not None): if (every_n_frames < 0): every_n_seconds = abs(every_n_frames) every_n_frames = int(every_n_seconds * frame_rate) if verbose: print('Interpreting a time sampling rate of {} hz as a frame interval of {}'.format( every_n_seconds,every_n_frames)) # 0 and 1 both mean "process every frame" elif every_n_frames == 0: every_n_frames = 1 elif every_n_frames > 0: every_n_frames = int(every_n_frames) # ...if every_n_frames was supplied # frame_number = 0 for frame_number in range(0,n_frames): # We've already read the first frame, when we opened the video if frame_number != 0: success,image = vidcap.read() else: success = True if not success: assert image is None if verbose: print('Read terminating at frame {} of {}'.format(frame_number,n_frames)) break if every_n_frames is not None: if (frame_number % every_n_frames) != 0: continue if frames_to_process is not None: if frame_number > max(frames_to_process): break if frame_number not in frames_to_process: continue frame_filename_relative = _frame_number_to_filename(frame_number) frame_filenames.append(frame_filename_relative) # Convert from OpenCV conventions to PIL conventions image_np = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Run the callback frame_results = frame_callback(image_np,frame_filename_relative) results.append(frame_results) # ...for each frame if len(frame_filenames) == 0: if allow_empty_videos: print('Warning: found no frames in file {}'.format(input_video_file)) else: raise Exception('Error: found no frames in file {}'.format(input_video_file)) if verbose: print('\nProcessed {} of {} frames for {}'.format( len(frame_filenames),n_frames,input_video_file)) finally: if vidcap is not None: try: vidcap.release() except Exception: pass to_return = {} to_return['frame_filenames'] = frame_filenames to_return['frame_rate'] = frame_rate to_return['results'] = results return to_return
# ...def run_callback_on_frames(...)
[docs] def run_callback_on_frames_for_folder(input_video_folder, frame_callback, every_n_frames=None, verbose=False, recursive=True, files_to_process_relative=None, error_on_empty_video=False): """ Calls the function frame_callback(np.array,image_id) on all (or selected) frames in all videos in [input_video_folder]. Args: input_video_folder (str): video folder to process frame_callback (function): callback to run on frames, should take an np.array and a string and return a single value. callback should expect two arguments: (1) a numpy array with image data, in the typical PIL image orientation/channel order, and (2) a string identifier for the frame, typically something like "frame0006.jpg" (even though it's not a JPEG image, this is just an identifier for the frame). every_n_frames (int or float, optional): sample every Nth frame starting from the first frame; if this is None or 1, every frame is processed. If this is a negative value, it's interpreted as a sampling rate in seconds, which is rounded to the nearest frame sampling rate. verbose (bool, optional): enable additional debug console output recursive (bool, optional): recurse into [input_video_folder] files_to_process_relative (list, optional): only process specific relative paths error_on_empty_video (bool, optional): by default, videos with errors or no valid frames are silently stored as failures; this turns them into exceptions Returns: dict: dict with keys 'video_filenames' (list of str), 'frame_rates' (list of floats), 'results' (list of list of dicts). 'video_filenames' will contain *relative* filenames. 'results' is a list (one element per video) of lists (one element per frame) of whatever the callback returns, typically (but not necessarily) dicts in the MD results format. For failed videos, the frame rate will be represented by -1, and "results" will be a dict with at least the key "failure". """ to_return = {'video_filenames':[],'frame_rates':[],'results':[]} if files_to_process_relative is not None: input_files_full_paths = \ [os.path.join(input_video_folder,fn) for fn in files_to_process_relative] input_files_full_paths = [fn.replace('\\','/') for fn in input_files_full_paths] else: # Recursively enumerate video files input_files_full_paths = find_videos(input_video_folder, recursive=recursive, convert_slashes=True, return_relative_paths=False) print('Processing {} videos from folder {}'.format(len(input_files_full_paths),input_video_folder)) if len(input_files_full_paths) == 0: print('No videos to process') return to_return # Process each video # video_fn_abs = input_files_full_paths[0] for video_fn_abs in tqdm(input_files_full_paths,desc=video_progress_bar_description): video_filename_relative = os.path.relpath(video_fn_abs,input_video_folder) video_filename_relative = video_filename_relative.replace('\\','/') to_return['video_filenames'].append(video_filename_relative) try: # video_results is a dict with fields: # # frame_rate # # results (list of objects returned by the callback, typically dicts in the MD # per-image format) # # frame_filenames (list of frame IDs, i.e. synthetic filenames) video_results = run_callback_on_frames(input_video_file=video_fn_abs, frame_callback=frame_callback, every_n_frames=every_n_frames, verbose=verbose, frames_to_process=None, allow_empty_videos=False) except Exception as e: if (not error_on_empty_video): print('Warning: error processing video {}: {}'.format( video_fn_abs,str(e) )) to_return['frame_rates'].append(-1.0) failure_result = {} failure_result['failure'] = 'Failure processing video: {}'.format(str(e)) to_return['results'].append(failure_result) continue else: raise # ...try/except to_return['frame_rates'].append(video_results['frame_rate']) for r in video_results['results']: assert r['file'].startswith('frame') r['file'] = video_filename_relative + '/' + r['file'] to_return['results'].append(video_results['results']) # ...for each video n_videos = len(input_files_full_paths) assert len(to_return['video_filenames']) == n_videos assert len(to_return['frame_rates']) == n_videos assert len(to_return['results']) == n_videos return to_return
# ...def run_callback_on_frames_for_folder(...)
[docs] def video_to_frames(input_video_file, output_folder, overwrite=True, every_n_frames=None, verbose=False, quality=None, max_width=None, frames_to_extract=None, allow_empty_videos=True): """ Renders frames from [input_video_file] to .jpg files in [output_folder]. With help from: https://stackoverflow.com/questions/33311153/python-extracting-and-saving-video-frames Args: input_video_file (str): video file to split into frames output_folder (str): folder to put frame images in overwrite (bool, optional): whether to overwrite existing frame images every_n_frames (int, optional): sample every Nth frame starting from the first frame; if this is None or 1, every frame is extracted. If this is a negative value, it's interpreted as a sampling rate in seconds, which is rounded to the nearest frame sampling rate. Mutually exclusive with frames_to_extract. verbose (bool, optional): enable additional debug console output quality (int, optional): JPEG quality for frame output, from 0-100. Defaults to the opencv default (typically 95). max_width (int, optional): resize frames to be no wider than [max_width] frames_to_extract (list of int, optional): extract this specific set of frames; mutually exclusive with every_n_frames. If all values are beyond the length of the video, no frames are extracted. Can also be a single int, specifying a single frame number. In the special case where frames_to_extract is [], this function still reads video frame rates and verifies that videos are readable, but no frames are extracted. allow_empty_videos (bool, optional): Just print a warning if a video appears to have no frames (by default, this is an error). Returns: tuple: length-2 tuple containing (list of frame filenames,frame rate) """ assert os.path.isfile(input_video_file), 'File {} not found'.format(input_video_file) if quality is not None and quality < 0: quality = None if isinstance(frames_to_extract,int): frames_to_extract = [frames_to_extract] if (frames_to_extract is not None) and (every_n_frames is not None): raise ValueError('frames_to_extract and every_n_frames are mutually exclusive') bypass_extraction = ((frames_to_extract is not None) and (len(frames_to_extract) == 0)) if not bypass_extraction: os.makedirs(output_folder,exist_ok=True) vidcap = cv2.VideoCapture(input_video_file) n_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) fs = vidcap.get(cv2.CAP_PROP_FPS) if (every_n_frames is not None) and (every_n_frames < 0): every_n_seconds = abs(every_n_frames) every_n_frames = int(every_n_seconds * fs) if verbose: print('Interpreting a time sampling rate of {} hz as a frame interval of {}'.format( every_n_seconds,every_n_frames)) # If we're not over-writing, check whether all frame images already exist if (not overwrite) and (not bypass_extraction): missing_frame_number = None missing_frame_filename = None frame_filenames = [] found_existing_frame = False for frame_number in range(0,n_frames): if every_n_frames is not None: assert frames_to_extract is None, \ 'Internal error: frames_to_extract and every_n_frames are exclusive' if (frame_number % every_n_frames) != 0: continue if frames_to_extract is not None: assert every_n_frames is None, \ 'Internal error: frames_to_extract and every_n_frames are exclusive' if frame_number not in frames_to_extract: continue frame_filename = _frame_number_to_filename(frame_number) frame_filename = os.path.join(output_folder,frame_filename) frame_filenames.append(frame_filename) if os.path.isfile(frame_filename): found_existing_frame = True continue else: missing_frame_number = frame_number missing_frame_filename = frame_filename break if verbose and missing_frame_number is not None: print('Missing frame {} ({}) for video {}'.format( missing_frame_number, missing_frame_filename, input_video_file)) # OpenCV seems to over-report the number of frames by 1 in some cases, or fails # to read the last frame; either way, I'm allowing one missing frame. allow_last_frame_missing = True # This doesn't have to mean literally the last frame number, it just means that if # we find this frame or later, we consider the video done last_expected_frame_number = n_frames-1 if every_n_frames is not None: last_expected_frame_number -= (every_n_frames*2) # When specific frames are requested, if anything is missing, reprocess the video if (frames_to_extract is not None) and (missing_frame_number is not None): pass # If no frames are missing, or only frames very close to the end of the video are "missing", # skip this video elif (missing_frame_number is None) or \ (allow_last_frame_missing and (missing_frame_number >= last_expected_frame_number)): if verbose: print('Skipping video {}, all output frames exist'.format(input_video_file)) return frame_filenames,fs else: # If we found some frames, but not all, print a message if verbose and found_existing_frame: print("Rendering video {}, couldn't find frame {} ({}) of {}".format( input_video_file, missing_frame_number, missing_frame_filename, last_expected_frame_number)) # ...if we need to check whether to skip this video entirely if verbose: print('Video {} contains {} frames at {} Hz'.format(input_video_file,n_frames,fs)) frame_filenames = [] # YOLOv5 does some totally bananas monkey-patching of opencv, which causes # problems if we try to supply a third parameter to imwrite (to specify JPEG # quality). Detect this case, and ignore the quality parameter if it looks # like imwrite has been messed with. # # See: # # https://github.com/ultralytics/yolov5/issues/7285 imwrite_patched = False n_imwrite_parameters = None try: # calling signature() on the native cv2.imwrite function will # fail, so an exception here is a good thing. In fact I don't think # there's a case where this *succeeds* and the number of parameters # is wrong. sig = signature(cv2.imwrite) n_imwrite_parameters = len(sig.parameters) except Exception: pass if (n_imwrite_parameters is not None) and (n_imwrite_parameters < 3): imwrite_patched = True if verbose and (quality is not None): print('Warning: quality value supplied, but YOLOv5 has mucked with cv2.imwrite, ignoring quality') # for frame_number in tqdm(range(0,n_frames)): for frame_number in range(0,n_frames): # Special handling for the case where we're just doing dummy reads if bypass_extraction: break success,image = vidcap.read() if not success: assert image is None if verbose: print('Read terminating at frame {} of {}'.format(frame_number,n_frames)) break if every_n_frames is not None: if (frame_number % every_n_frames) != 0: continue if frames_to_extract is not None: if frame_number > max(frames_to_extract): break if frame_number not in frames_to_extract: continue # Has resizing been requested? if max_width is not None: # image.shape is h/w/dims input_shape = image.shape assert input_shape[2] == 3 input_width = input_shape[1] # Is resizing necessary? if input_width > max_width: scale = max_width / input_width assert scale <= 1.0 # INTER_AREA is recommended for size reduction image = cv2.resize(image, (0,0), fx=scale, fy=scale, interpolation=cv2.INTER_AREA) # ...if we need to deal with resizing frame_filename_relative = _frame_number_to_filename(frame_number) frame_filename = os.path.join(output_folder,frame_filename_relative) frame_filenames.append(frame_filename) if (not overwrite) and (os.path.isfile(frame_filename)): # print('Skipping frame {}'.format(frame_filename)) pass else: try: if frame_filename.isascii(): if quality is None or imwrite_patched: cv2.imwrite(os.path.normpath(frame_filename),image) else: cv2.imwrite(os.path.normpath(frame_filename),image, [int(cv2.IMWRITE_JPEG_QUALITY), quality]) else: if quality is None: is_success, im_buf_arr = cv2.imencode('.jpg', image) else: encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality] is_success, im_buf_arr = cv2.imencode('.jpg', image, encode_param) im_buf_arr.tofile(frame_filename) assert os.path.isfile(frame_filename), \ 'Output frame {} unavailable'.format(frame_filename) except KeyboardInterrupt: vidcap.release() raise except Exception as e: print('Error on frame {} of {}: {}'.format(frame_number,n_frames,str(e))) # ...for each frame if len(frame_filenames) == 0: if allow_empty_videos: print('Warning: no frames extracted from file {}'.format(input_video_file)) else: raise Exception('Error: no frames extracted from file {}'.format(input_video_file)) if verbose: print('\nExtracted {} of {} frames for {}'.format( len(frame_filenames),n_frames,input_video_file)) vidcap.release() return frame_filenames,fs
# ...def video_to_frames(...) def _video_to_frames_with_per_video_frames(args): """ Wrapper function to handle extracting a different list of frames for each video in a multiprocessing context. Takes a tuple of (relative_fn, frames_for_this_video, other_args), where (other_args) contains the arguments that are the same for each iteration. """ relative_fn, frames_for_this_video, other_args = args (input_folder, output_folder_base, every_n_frames, overwrite, verbose, quality, max_width, allow_empty_videos) = other_args return _video_to_frames_for_folder(relative_fn, input_folder, output_folder_base, every_n_frames, overwrite, verbose, quality, max_width, frames_for_this_video, allow_empty_videos) def _video_to_frames_for_folder(relative_fn,input_folder,output_folder_base, every_n_frames,overwrite,verbose,quality,max_width, frames_to_extract,allow_empty_videos): """ Internal function to call video_to_frames for a single video in the context of video_folder_to_frames; makes sure the right output folder exists, then calls video_to_frames. """ input_fn_absolute = os.path.join(input_folder,relative_fn) assert os.path.isfile(input_fn_absolute),\ 'Could not find file {}'.format(input_fn_absolute) # Create the target output folder output_folder_video = os.path.join(output_folder_base,relative_fn) try: os.makedirs(output_folder_video,exist_ok=True) except Exception: output_folder_clean = clean_path(output_folder_video) print('Warning: failed to create folder {}, trying {}'.format( output_folder_video,output_folder_clean)) output_folder_video = output_folder_clean os.makedirs(output_folder_video,exist_ok=True) # Render frames # input_video_file = input_fn_absolute; output_folder = output_folder_video frame_filenames,fs = video_to_frames(input_fn_absolute, output_folder_video, overwrite=overwrite, every_n_frames=every_n_frames, verbose=verbose, quality=quality, max_width=max_width, frames_to_extract=frames_to_extract, allow_empty_videos=allow_empty_videos) return frame_filenames,fs
[docs] def video_folder_to_frames(input_folder, output_folder_base, recursive=True, overwrite=True, n_threads=1, every_n_frames=None, verbose=False, parallelization_uses_threads=True, quality=None, max_width=None, frames_to_extract=None, allow_empty_videos=False, relative_paths_to_process=None): """ For every video file in input_folder, creates a folder within output_folder_base, and renders frame of that video to images in that folder. Args: input_folder (str): folder to process output_folder_base (str): root folder for output images; subfolders will be created for each input video recursive (bool, optional): whether to recursively process videos in [input_folder] overwrite (bool, optional): whether to overwrite existing frame images n_threads (int, optional): number of concurrent workers to use; set to <= 1 to disable parallelism every_n_frames (int or float, optional): sample every Nth frame starting from the first frame; if this is None or 1, every frame is extracted. If this is a negative value, it's interpreted as a sampling rate in seconds, which is rounded to the nearest frame sampling rate. Mutually exclusive with frames_to_extract. verbose (bool, optional): enable additional debug console output parallelization_uses_threads (bool, optional): whether to use threads (True) or processes (False) for parallelization; ignored if n_threads <= 1 quality (int, optional): JPEG quality for frame output, from 0-100. Defaults to the opencv default (typically 95). max_width (int, optional): resize frames to be no wider than [max_width] frames_to_extract (int, list of int, or dict, optional): extract this specific set of frames from each video; mutually exclusive with every_n_frames. If all values are beyond the length of a video, no frames are extracted. Can also be a single int, specifying a single frame number. In the special case where frames_to_extract is [], this function still reads video frame rates and verifies that videos are readable, but no frames are extracted. Can be a dict mapping relative paths to lists of frame numbers to extract different frames from each video. allow_empty_videos (bool, optional): just print a warning if a video appears to have no frames (by default, this is an error). relative_paths_to_process (list, optional): only process the relative paths on this list Returns: tuple: a length-3 tuple containing: - list of lists of frame filenames; the Nth list of frame filenames corresponds to the Nth video - list of video frame rates; the Nth value corresponds to the Nth video - list of video filenames """ # Enumerate video files if necessary if relative_paths_to_process is None: if verbose: print('Enumerating videos in {}'.format(input_folder)) input_files_full_paths = find_videos(input_folder,recursive=recursive) if verbose: print('Found {} videos in folder {}'.format(len(input_files_full_paths),input_folder)) if len(input_files_full_paths) == 0: return [],[],[] input_files_relative_paths = [os.path.relpath(s,input_folder) for s in input_files_full_paths] else: input_files_relative_paths = relative_paths_to_process input_files_full_paths = [os.path.join(input_folder,fn) for fn in input_files_relative_paths] input_files_relative_paths = [s.replace('\\','/') for s in input_files_relative_paths] os.makedirs(output_folder_base,exist_ok=True) frame_filenames_by_video = [] fs_by_video = [] if n_threads == 1: # For each video # # input_fn_relative = input_files_relative_paths[0] for input_fn_relative in tqdm(input_files_relative_paths,desc='Video to frames'): # If frames_to_extract is a dict, get the specific frames for this video if isinstance(frames_to_extract, dict): frames_for_this_video = frames_to_extract.get(input_fn_relative, []) else: frames_for_this_video = frames_to_extract frame_filenames,fs = \ _video_to_frames_for_folder(input_fn_relative, input_folder, output_folder_base, every_n_frames, overwrite, verbose, quality, max_width, frames_for_this_video, allow_empty_videos) frame_filenames_by_video.append(frame_filenames) fs_by_video.append(fs) else: pool = None results = None try: if parallelization_uses_threads: print('Starting a worker pool with {} threads'.format(n_threads)) pool = ThreadPool(n_threads) else: print('Starting a worker pool with {} processes'.format(n_threads)) pool = Pool(n_threads) if isinstance(frames_to_extract, dict): # For the dict case, we need to extract different frames from each video. # These arguments are the same for every iteration other_args = (input_folder, output_folder_base, every_n_frames, overwrite, verbose, quality, max_width, allow_empty_videos) # The filename and list of frames to extract vary with each iteration args_for_pool = [(relative_fn, frames_to_extract.get(relative_fn, []), other_args) for relative_fn in input_files_relative_paths] results = list(tqdm(pool.imap(_video_to_frames_with_per_video_frames, args_for_pool), total=len(args_for_pool),desc='Video to frames')) else: process_video_with_options = partial(_video_to_frames_for_folder, input_folder=input_folder, output_folder_base=output_folder_base, every_n_frames=every_n_frames, overwrite=overwrite, verbose=verbose, quality=quality, max_width=max_width, frames_to_extract=frames_to_extract, allow_empty_videos=allow_empty_videos) results = list(tqdm(pool.imap(process_video_with_options, input_files_relative_paths), total=len(input_files_relative_paths),desc='Video to frames')) # ...if we need to pass different frames for each video finally: if pool is not None: pool.close() pool.join() print('Pool closed and joined for video processing') # ...try/finally frame_filenames_by_video = [x[0] for x in results] fs_by_video = [x[1] for x in results] # ...if we're working on a single thread vs. multiple workers return frame_filenames_by_video,fs_by_video,input_files_full_paths
# ...def video_folder_to_frames(...)
[docs] class FrameToVideoOptions: """ Options controlling the conversion of frame-level results to video-level results via frame_results_to_video_results() """ def __init__(self): #: One-indexed indicator of which frame-level confidence value to use to determine detection confidence #: for the whole video, i.e. "1" means "use the confidence value from the highest-confidence frame" self.nth_highest_confidence = 1 #: Should we include just a single representative frame result for each video (default), or #: every frame that was processed? self.include_all_processed_frames = False #: What to do if a file referred to in a .json results file appears not to be a #: video; can be 'error' or 'skip_with_warning' self.non_video_behavior = 'error' #: Are frame rates required? self.frame_rates_are_required = False #: Enable additional debug output self.verbose = False
[docs] def frame_results_to_video_results(input_file, output_file, options=None, video_filename_to_frame_rate=None): """ Given an MD results file produced at the *frame* level, corresponding to a directory created with video_folder_to_frames, maps those frame-level results back to the video level for use in Timelapse. Preserves everything in the input .json file other than the images. Args: input_file (str): the frame-level MD results file to convert to video-level results output_file (str): the .json file to which we should write video-level results options (FrameToVideoOptions, optional): parameters for converting frame-level results to video-level results, see FrameToVideoOptions for details video_filename_to_frame_rate (dict, optional): maps (relative) video path names to frame rates, used only to populate the output file """ if options is None: options = FrameToVideoOptions() if options.frame_rates_are_required: assert video_filename_to_frame_rate is not None, \ 'You specified that frame rates are required, but you did not ' + \ 'supply video_filename_to_frame_rate' # Load results with open(input_file,'r') as f: input_data = json.load(f) images = input_data['images'] detection_categories = input_data['detection_categories'] ## Break into videos video_to_frame_info = defaultdict(list) # im = images[0] for im in tqdm(images): fn = im['file'] video_name = os.path.dirname(fn) if not is_video_file(video_name): if options.non_video_behavior == 'error': raise ValueError('{} is not a video file'.format(video_name)) elif options.non_video_behavior == 'skip_with_warning': print('Warning: {} is not a video file'.format(video_name)) continue else: raise ValueError('Unrecognized non-video handling behavior: {}'.format( options.non_video_behavior)) # Attach video-specific fields to the output, specifically attach the frame # number to both the video and each detection. frame_number = _filename_to_frame_number(fn) im['frame_number'] = frame_number for detection in im['detections']: detection['frame_number'] = frame_number video_to_frame_info[video_name].append(im) # ...for each frame referred to in the results file print('Found {} unique videos in {} frame-level results'.format( len(video_to_frame_info),len(images))) output_images = [] ## For each video... # video_name = list(video_to_frame_info.keys())[0] for video_name in tqdm(video_to_frame_info): # Prepare the output representation for this video im_out = {} im_out['file'] = video_name if (video_filename_to_frame_rate is not None): if video_name not in video_filename_to_frame_rate: s = 'Could not determine frame rate for {}'.format(video_name) if options.frame_rates_are_required: raise ValueError(s) elif options.verbose: print('Warning: {}'.format(s)) if video_name in video_filename_to_frame_rate: im_out['frame_rate'] = video_filename_to_frame_rate[video_name] # Find all detections for this video all_detections_this_video = [] frames = video_to_frame_info[video_name] # frame = frames[0] for frame in frames: if ('detections' in frame) and (frame['detections'] is not None): all_detections_this_video.extend(frame['detections']) # Should we keep detections for all frames? if (options.include_all_processed_frames): im_out['detections'] = all_detections_this_video # ...or should we keep just a canonical detection for each category? else: canonical_detections = [] # category_id = list(detection_categories.keys())[0] for category_id in detection_categories: category_detections = [det for det in all_detections_this_video if \ det['category'] == category_id] # Find the nth-highest-confidence video to choose a confidence value if len(category_detections) >= options.nth_highest_confidence: category_detections_by_confidence = sorted(category_detections, key = lambda i: i['conf'],reverse=True) canonical_detection = category_detections_by_confidence[options.nth_highest_confidence-1] canonical_detections.append(canonical_detection) im_out['detections'] = canonical_detections # 'max_detection_conf' is no longer included in output files by default if False: im_out['max_detection_conf'] = 0 if len(canonical_detections) > 0: confidences = [d['conf'] for d in canonical_detections] im_out['max_detection_conf'] = max(confidences) # ...if we're keeping output for all frames / canonical frames output_images.append(im_out) # ...for each video output_data = input_data output_data['images'] = output_images s = json.dumps(output_data,indent=1) # Write the output file with open(output_file,'w') as f: f.write(s)
# ...def frame_results_to_video_results(...) #%% Test drivers if False: pass #%% Constants input_folder = r'G:\temp\usu-long\data' frame_folder_base = r'g:\temp\usu-long-single-frames' assert os.path.isdir(input_folder) #%% Split videos into frames frame_filenames_by_video,fs_by_video,video_filenames = \ video_folder_to_frames(input_folder, frame_folder_base, recursive=True, overwrite=True, n_threads=10, every_n_frames=None, verbose=True, parallelization_uses_threads=True, quality=None, max_width=None, frames_to_extract=150) #%% Constants for detection tests detected_frame_folder_base = r'e:\video_test\detected_frames' rendered_videos_folder_base = r'e:\video_test\rendered_videos' os.makedirs(detected_frame_folder_base,exist_ok=True) os.makedirs(rendered_videos_folder_base,exist_ok=True) results_file = r'results.json' confidence_threshold = 0.75 #%% Load detector output with open(results_file,'r') as f: detection_results = json.load(f) detections = detection_results['images'] detector_label_map = detection_results['detection_categories'] for d in detections: d['file'] = d['file'].replace('\\','/').replace('video_frames/','') #%% List image files, break into folders frame_files = path_utils.find_images(frame_folder_base,True) frame_files = [s.replace('\\','/') for s in frame_files] print('Enumerated {} total frames'.format(len(frame_files))) # Find unique folders folders = set() # fn = frame_files[0] for fn in frame_files: folders.add(os.path.dirname(fn)) folders = [s.replace('\\','/') for s in folders] print('Found {} folders for {} files'.format(len(folders),len(frame_files))) #%% Render detector frames # folder = list(folders)[0] for folder in folders: frame_files_this_folder = [fn for fn in frame_files if folder in fn] folder_relative = folder.replace((frame_folder_base + '/').replace('\\','/'),'') detection_results_this_folder = [d for d in detections if folder_relative in d['file']] print('Found {} detections in folder {}'.format(len(detection_results_this_folder),folder)) assert len(frame_files_this_folder) == len(detection_results_this_folder) rendered_frame_output_folder = os.path.join(detected_frame_folder_base,folder_relative) os.makedirs(rendered_frame_output_folder,exist_ok=True) # d = detection_results_this_folder[0] for d in tqdm(detection_results_this_folder): input_file = os.path.join(frame_folder_base,d['file']) output_file = os.path.join(detected_frame_folder_base,d['file']) os.makedirs(os.path.dirname(output_file),exist_ok=True) vis_utils.draw_bounding_boxes_on_file(input_file,output_file,d['detections'], confidence_threshold) # ...for each file in this folder # ...for each folder #%% Render output videos # folder = list(folders)[0] for folder in tqdm(folders): folder_relative = folder.replace((frame_folder_base + '/').replace('\\','/'),'') rendered_detector_output_folder = os.path.join(detected_frame_folder_base,folder_relative) assert os.path.isdir(rendered_detector_output_folder) frame_files_relative = os.listdir(rendered_detector_output_folder) frame_files_absolute = [os.path.join(rendered_detector_output_folder,s) \ for s in frame_files_relative] output_video_filename = os.path.join(rendered_videos_folder_base,folder_relative) os.makedirs(os.path.dirname(output_video_filename),exist_ok=True) original_video_filename = output_video_filename.replace( rendered_videos_folder_base,input_folder) assert os.path.isfile(original_video_filename) fs = get_video_fs(original_video_filename) frames_to_video(frame_files_absolute, fs, output_video_filename) # ...for each video