Source code for megadetector.utils.url_utils

"""

url_utils.py

Frequently-used functions for downloading, manipulating, or serving URLs

"""

#%% Imports and constants

import os
import re
import urllib
import urllib.request
import urllib.error
import requests
import shutil
import pytest
import socketserver
import threading
import http.server

from functools import partial
from tqdm import tqdm
from urllib.parse import urlparse
from multiprocessing.pool import ThreadPool
from multiprocessing.pool import Pool

from megadetector.utils.ct_utils import make_test_folder
from megadetector.utils.ct_utils import make_temp_folder

max_path_len = 255


#%% Download functions

[docs] class DownloadProgressBar: """ Progress updater based on the progressbar2 package. https://stackoverflow.com/questions/37748105/how-to-use-progressbar-module-with-urlretrieve """ def __init__(self): self.pbar = None def __call__(self, block_num, block_size, total_size): # noqa if not self.pbar: try: import progressbar # type: ignore self.pbar = progressbar.ProgressBar(max_value=total_size) self.pbar.start() except ImportError: self.pbar = None # print("ProgressBar not available, install 'progressbar2' for visual progress.") if self.pbar: downloaded = block_num * block_size if downloaded < total_size: self.pbar.update(downloaded) else: self.pbar.finish()
[docs] def download_url(url, destination_filename=None, progress_updater=None, force_download=False, verbose=True, escape_spaces=True): """ Downloads a URL to a file. If no file is specified, creates a temporary file, making a best effort to avoid filename collisions. Prints some diagnostic information and makes sure to omit SAS tokens from printouts. Args: url (str): the URL to download destination_filename (str, optional): the target filename; if None, will create a file in system temp space progress_updater (object or bool, optional): can be "None", "False", "True", or a specific callable object. If None or False, no progress updated will be displayed. If True, a default progress bar will be created. force_download (bool, optional): download this file even if [destination_filename] exists. verbose (bool, optional): enable additional debug console output escape_spaces (bool, optional): replace ' ' with '%20' Returns: str: the filename to which [url] was downloaded, the same as [destination_filename] if [destination_filename] was not None """ if progress_updater is not None and isinstance(progress_updater,bool): if not progress_updater: progress_updater = None else: progress_updater = DownloadProgressBar() url_no_sas = url.split('?')[0] if destination_filename is None: target_folder = make_temp_folder(subfolder='url_utils',append_guid=False) url_without_sas = url.split('?', 1)[0] # This does not guarantee uniqueness, hence "semi-best-effort" url_as_filename = re.sub(r'\W+', '', url_without_sas) n_folder_chars = len(target_folder) if (len(url_as_filename) + n_folder_chars) >= max_path_len: print('Warning: truncating filename target to {} characters'.format(max_path_len)) max_fn_len = max_path_len - (n_folder_chars + 1) url_as_filename = url_as_filename[-1 * max_fn_len:] destination_filename = \ os.path.join(target_folder,url_as_filename) # ...if the destination filename wasn't specified if escape_spaces: url = url.replace(' ','%20') if (not force_download) and (os.path.isfile(destination_filename)): if verbose: print('Bypassing download of already-downloaded file {}'.format(os.path.basename(url_no_sas))) else: if verbose: print('Downloading file {} to {}'.format(os.path.basename(url_no_sas),destination_filename),end='') target_dir = os.path.dirname(destination_filename) if len(target_dir) > 0: os.makedirs(target_dir,exist_ok=True) urllib.request.urlretrieve(url, destination_filename, progress_updater) assert(os.path.isfile(destination_filename)) n_bytes = os.path.getsize(destination_filename) if verbose: print('...done, {} bytes.'.format(n_bytes)) return destination_filename
# ...def download_url(...)
[docs] def download_relative_filename(url, output_base, verbose=False): """ Download a URL to output_base, preserving relative path. Path is relative to the site, so: https://abc.com/xyz/123.txt ...will get downloaded to: output_base/xyz/123.txt Args: url (str): the URL to download output_base (str): the base folder to which we should download this file verbose (bool, optional): enable additional debug console output Returns: str: the local destination filename """ p = urlparse(url) # remove the leading '/' assert p.path.startswith('/'); relative_filename = p.path[1:] destination_filename = os.path.join(output_base,relative_filename) return download_url(url, destination_filename, verbose=verbose)
# ...def download_relative_filename(...) def _do_parallelized_download(download_info,overwrite=False,verbose=False): """ Internal function for download parallelization. """ url = download_info['url'] target_file = download_info['target_file'] result = {'status':'unknown','url':url,'target_file':target_file} if ((os.path.isfile(target_file)) and (not overwrite)): if verbose: print('Skipping existing file {}'.format(target_file)) result['status'] = 'skipped' return result try: download_url(url=url, destination_filename=target_file, verbose=verbose, force_download=overwrite) except Exception as e: print('Warning: error downloading URL {}: {}'.format( url,str(e))) result['status'] = 'error: {}'.format(str(e)) return result result['status'] = 'success' return result # ...def _do_parallelized_download(...)
[docs] def parallel_download_urls(url_to_target_file, verbose=False, overwrite=False, n_workers=20, pool_type='thread'): """ Downloads a list of URLs to local files. Catches exceptions and reports them in the returned "results" array. Args: url_to_target_file (dict): a dict mapping URLs to local filenames. verbose (bool, optional): enable additional debug console output overwrite (bool, optional): whether to overwrite existing local files n_workers (int, optional): number of concurrent workers, set to <=1 to disable parallelization pool_type (str, optional): worker type to use; should be 'thread' or 'process' Returns: list: list of dicts with keys: - 'url': the url this item refers to - 'status': 'skipped', 'success', or a string starting with 'error' - 'target_file': the local filename to which we downloaded (or tried to download) this URL """ all_download_info = [] if verbose: print('Preparing download list') for url in tqdm(url_to_target_file, disable=(not verbose)): download_info = {} download_info['url'] = url download_info['target_file'] = url_to_target_file[url] all_download_info.append(download_info) if verbose: print('Downloading {} images on {} workers'.format( len(all_download_info),n_workers)) if n_workers <= 1: results = [] for download_info in tqdm(all_download_info, disable=(not verbose)): result = _do_parallelized_download(download_info,overwrite=overwrite,verbose=verbose) results.append(result) else: pool = None try: if pool_type == 'thread': pool = ThreadPool(n_workers) else: assert pool_type == 'process', 'Unsupported pool type {}'.format(pool_type) pool = Pool(n_workers) if verbose: print('Starting a {} pool with {} workers'.format(pool_type,n_workers)) results = list(tqdm(pool.imap( partial(_do_parallelized_download,overwrite=overwrite,verbose=verbose), all_download_info), total=len(all_download_info), disable=(not verbose))) finally: if pool: pool.close() pool.join() print('Pool closed and joined for parallel URL downloads') return results
# ...def parallel_download_urls(...)
[docs] @pytest.mark.skip(reason="This is not a test function") def test_url(url,error_on_failure=True,timeout=None): """ Tests the availability of [url], returning an http status code. Args: url (str): URL to test error_on_failure (bool, optional): whether to error (vs. just returning an error code) if accessing this URL fails timeout (int, optional): timeout in seconds to wait before considering this access attempt to be a failure; see requests.head() for precise documentation Returns: int: http status code (200 for success) """ r = requests.head(url, stream=True, verify=True, timeout=timeout) if error_on_failure and r.status_code != 200: raise ValueError('Could not access {}: error {}'.format(url,r.status_code)) return r.status_code
[docs] @pytest.mark.skip(reason="This is not a test function") def test_urls(urls,error_on_failure=True,n_workers=1,pool_type='thread',timeout=None,verbose=False): """ Verify that URLs are available (i.e., returns status 200). By default, errors if any URL is unavailable. Args: urls (list): list of URLs to test error_on_failure (bool, optional): whether to error (vs. just returning an error code) if accessing this URL fails n_workers (int, optional): number of concurrent workers, set to <=1 to disable parallelization pool_type (str, optional): worker type to use; should be 'thread' or 'process' timeout (int, optional): timeout in seconds to wait before considering this access attempt to be a failure; see requests.head() for precise documentation verbose (bool, optional): enable additional debug output Returns: list: a list of http status codes, the same length and order as [urls] """ if n_workers <= 1: status_codes = [] for url in tqdm(urls,disable=(not verbose)): r = requests.get(url, timeout=timeout) if error_on_failure and r.status_code != 200: raise ValueError('Could not access {}: error {}'.format(url,r.status_code)) status_codes.append(r.status_code) else: pool = None try: if pool_type == 'thread': pool = ThreadPool(n_workers) else: assert pool_type == 'process', 'Unsupported pool type {}'.format(pool_type) pool = Pool(n_workers) if verbose: print('Starting a {} pool with {} workers'.format(pool_type,n_workers)) status_codes = list(tqdm(pool.imap( partial(test_url,error_on_failure=error_on_failure,timeout=timeout), urls), total=len(urls), disable=(not verbose))) finally: if pool: pool.close() pool.join() print('Pool closed and joined for URL tests') return status_codes
# ...def test_urls(...)
[docs] def get_url_size(url,verbose=False,timeout=None): """ Get the size of the file pointed to by a URL, based on the Content-Length property. If the URL is not available, or the Content-Length property is not available, or the content-Length property is not an integer, returns None. Args: url (str): the url to test verbose (bool, optional): enable additional debug output timeout (int, optional): timeout in seconds to wait before considering this access attempt to be a failure; see requests.head() for precise documentation Returns: int: the file size in bytes, or None if it can't be retrieved """ try: r = urllib.request.Request(url,method='HEAD') f = urllib.request.urlopen(r, timeout=timeout) if f.status != 200: if verbose: print('Status {} retrieving file size for {}'.format(f.status,url)) return None size_bytes_str = f.headers.get('Content-Length') if size_bytes_str is None: if verbose: print('No Content-Length header for {}'.format(url)) return None size_bytes = int(size_bytes_str) return size_bytes except Exception as e: if verbose: print('Error retrieving file size for {}:\n{}'.format(url,str(e))) return None
# ...def get_url_size(...)
[docs] def get_url_sizes(urls,n_workers=1,pool_type='thread',timeout=None,verbose=False): """ Retrieve file sizes for the URLs specified by [urls]. Returns None for any URLs that we can't access, or URLs for which the Content-Length property is not set. Args: urls (list): list of URLs for which we should retrieve sizes n_workers (int, optional): number of concurrent workers, set to <=1 to disable parallelization pool_type (str, optional): worker type to use; should be 'thread' or 'process' timeout (int, optional): timeout in seconds to wait before considering this access attempt to be a failure; see requests.head() for precise documentation verbose (bool, optional): print additional debug information Returns: dict: maps urls to file sizes, which will be None for URLs for which we were unable to retrieve a valid size. """ url_to_size = {} if n_workers <= 1: for url in tqdm(urls, disable=(not verbose)): url_to_size[url] = get_url_size(url,verbose=verbose,timeout=timeout) else: pool = None try: if pool_type == 'thread': pool = ThreadPool(n_workers) else: assert pool_type == 'process', 'Unsupported pool type {}'.format(pool_type) pool = Pool(n_workers) if verbose: print('Starting a {} pool with {} workers'.format(pool_type,n_workers)) file_sizes = list(tqdm(pool.imap( partial(get_url_size,verbose=verbose,timeout=timeout), urls), total=len(urls), disable=(not verbose))) for i_url,url in enumerate(urls): url_to_size[url] = file_sizes[i_url] finally: if pool: pool.close() pool.join() print('Pool closed and joined for URL size checks') return url_to_size
#%% Singleton HTTP server
[docs] class QuietHTTPRequestHandler(http.server.SimpleHTTPRequestHandler): """ SimpleHTTPRequestHandler subclass that suppresses console printouts """ def __init__(self, *args, directory=None, **kwargs): super().__init__(*args, directory=directory, **kwargs)
[docs] def log_message(self, format, *args): # noqa pass
[docs] class SingletonHTTPServer: """ HTTP server that runs on a local port, serving a particular local folder. Runs as a singleton, so starting a server in a new folder closes the previous server. I use this primarily to serve MD/SpeciesNet previews from manage_local_batch, which can exceed the 260-character filename length limitation imposed by browser on Windows, so really the point here is just to remove characters from the URL. """ _server = None _thread = None
[docs] @classmethod def start_server(cls, directory, port=8000, host='localhost'): """ Start or restart the HTTP server with a specific directory Args: directory (str): the root folder served by the server port (int, optional): the port on which to create the server host (str, optional): the host on which to listen, typically either "localhost" (default) or "0.0.0.0" Returns: str: URL to the running host """ # Stop the existing server instance if necessary cls.stop_server() # Create new server handler = partial(QuietHTTPRequestHandler, directory=directory) cls._server = socketserver.TCPServer((host, port), handler) # Start server in daemon thread (dies when parent process dies) cls._thread = threading.Thread(target=cls._server.serve_forever) cls._thread.daemon = True cls._thread.start() print(f"Serving {directory} at http://{host}:{port}") return f"http://{host}:{port}"
[docs] @classmethod def stop_server(cls): """ Stop the current server (if one is running) """ if cls._server: cls._server.shutdown() cls._server.server_close() cls._server = None if cls._thread: cls._thread.join(timeout=1) cls._thread = None
[docs] @classmethod def is_running(cls): """ Check whether the server is currently running. Returns: bool: True if the server is running """ return (cls._server is not None) and \ (cls._thread is not None) and \ (cls._thread.is_alive())
# ...class SingletonHTTPServer #%% Tests # Constants for tests SMALL_FILE_URL = "https://www.google.com/images/branding/googlelogo/1x/googlelogo_color_272x92dp.png" REDIRECT_SRC_URL = "http://google.com" REDIRECT_DEST_URL = "https://www.google.com/" NON_EXISTENT_URL = "https://example.com/non_existent_page_404.html" DEFINITELY_NON_EXISTENT_DOMAIN_URL = "https://thisshouldnotexist1234567890.com/file.txt" RELATIVE_DOWNLOAD_URL = "https://raw.githubusercontent.com/agentmorris/MegaDetector/main/README.md" RELATIVE_DOWNLOAD_CONTAIN_TOKEN = 'agentmorris' RELATIVE_DOWNLOAD_NOT_CONTAIN_TOKEN = 'github'
[docs] class TestUrlUtils: """ Tests for url_utils.py """
[docs] def set_up(self): """ Create a temporary directory for testing. """ self.test_dir = make_test_folder(subfolder='url_utils_tests') self.download_target_dir = os.path.join(self.test_dir, 'downloads') os.makedirs(self.download_target_dir, exist_ok=True)
[docs] def tear_down(self): """ Remove the temporary directory after tests and restore module temp_dir. """ if os.path.exists(self.test_dir): shutil.rmtree(self.test_dir)
[docs] def test_download_url_to_specified_file(self): """ Test download_url with a specified destination filename. """ dest_filename = os.path.join(self.download_target_dir, "downloaded_google_logo.png") returned_filename = download_url(SMALL_FILE_URL, destination_filename=dest_filename, verbose=False) assert returned_filename == dest_filename assert os.path.exists(dest_filename) assert os.path.getsize(dest_filename) > 1000
[docs] def test_download_url_to_temp_file(self): """ Test download_url when destination_filename is None. """ returned_filename = download_url(SMALL_FILE_URL, destination_filename=None, verbose=False) assert os.path.exists(returned_filename) assert os.path.getsize(returned_filename) > 1000
[docs] def test_download_url_non_existent(self): """ Test download_url with a non-existent URL. """ dest_filename = os.path.join(self.download_target_dir, "non_existent.html") try: download_url(NON_EXISTENT_URL, destination_filename=dest_filename, verbose=False) raise AssertionError("urllib.error.HTTPError not raised for 404") except urllib.error.HTTPError: pass try: download_url(DEFINITELY_NON_EXISTENT_DOMAIN_URL, destination_filename=dest_filename, verbose=False) raise AssertionError( "urllib.error.URLError or requests.exceptions.ConnectionError not raised for DNS failure") except urllib.error.URLError: pass except requests.exceptions.ConnectionError: pass
[docs] def test_download_url_force_download(self): """ Test the force_download parameter of download_url. """ dest_filename = os.path.join(self.download_target_dir, "force_test.png") download_url(SMALL_FILE_URL, destination_filename=dest_filename, verbose=False) assert os.path.exists(dest_filename) initial_mtime = os.path.getmtime(dest_filename) download_url(SMALL_FILE_URL, destination_filename=dest_filename, verbose=True) assert os.path.getmtime(dest_filename) == initial_mtime download_url(SMALL_FILE_URL, destination_filename=dest_filename, force_download=True, verbose=False) assert os.path.exists(dest_filename)
[docs] def test_download_url_escape_spaces(self): """ Test download_url with spaces in the URL. """ dest_filename = os.path.join(self.download_target_dir, "escape_test.png") download_url(SMALL_FILE_URL, destination_filename=dest_filename, escape_spaces=True, verbose=False) assert os.path.exists(dest_filename)
[docs] def test_download_relative_filename(self): """ Test download_relative_filename. """ output_base = os.path.join(self.download_target_dir, "relative_dl") returned_filename = download_relative_filename(RELATIVE_DOWNLOAD_URL, output_base, verbose=False) assert RELATIVE_DOWNLOAD_CONTAIN_TOKEN in returned_filename assert RELATIVE_DOWNLOAD_NOT_CONTAIN_TOKEN not in returned_filename assert os.path.exists(returned_filename) assert os.path.getsize(returned_filename) > 100
[docs] def test_parallel_download_urls(self): """ Test parallel_download_urls (with n_workers=1 for simplicity). """ url1_target = os.path.join(self.download_target_dir, "parallel_dl_1.png") url2_target = os.path.join(self.download_target_dir, "parallel_dl_2_nonexistent.html") url_to_target_file = { SMALL_FILE_URL: url1_target, NON_EXISTENT_URL: url2_target } results = parallel_download_urls(url_to_target_file, n_workers=1, verbose=False) assert len(results) == 2 status_map = {res['url']: res for res in results} assert status_map[SMALL_FILE_URL]['status'] == 'success' assert status_map[SMALL_FILE_URL]['target_file'] == url1_target assert os.path.exists(url1_target) assert status_map[NON_EXISTENT_URL]['status'].startswith('error: HTTP Error 404') assert status_map[NON_EXISTENT_URL]['target_file'] == url2_target assert not os.path.exists(url2_target) if not os.path.exists(url1_target): download_url(SMALL_FILE_URL, url1_target, verbose=False) results_skip = parallel_download_urls({SMALL_FILE_URL: url1_target}, n_workers=1, overwrite=False, verbose=True) assert results_skip[0]['status'] == 'skipped' results_overwrite = parallel_download_urls({SMALL_FILE_URL: url1_target}, n_workers=1, overwrite=True, verbose=False) assert results_overwrite[0]['status'] == 'success'
[docs] def test_test_url_and_test_urls(self): """ Test test_url and test_urls functions. """ assert test_url(SMALL_FILE_URL, error_on_failure=False, timeout=10) == 200 assert test_url(REDIRECT_SRC_URL, error_on_failure=False, timeout=10) in (200,301) status_non_existent = test_url(NON_EXISTENT_URL, error_on_failure=False, timeout=5) assert status_non_existent == 404 try: test_url(NON_EXISTENT_URL, error_on_failure=True, timeout=5) raise AssertionError("ValueError not raised for NON_EXISTENT_URL") except ValueError: pass try: test_url(DEFINITELY_NON_EXISTENT_DOMAIN_URL, error_on_failure=True, timeout=5) raise AssertionError("requests.exceptions.ConnectionError or urllib.error.URLError not raised") except requests.exceptions.ConnectionError: pass except urllib.error.URLError: pass urls_to_test = [SMALL_FILE_URL, NON_EXISTENT_URL] status_codes = test_urls(urls_to_test, error_on_failure=False, n_workers=1, timeout=10) assert len(status_codes) == 2 assert status_codes[0] == 200 assert status_codes[1] == 404 try: test_urls(urls_to_test, error_on_failure=True, n_workers=1, timeout=5) raise AssertionError("ValueError not raised for urls_to_test") except ValueError: pass good_urls = [SMALL_FILE_URL, REDIRECT_SRC_URL] good_status_codes = test_urls(good_urls, error_on_failure=True, n_workers=1, timeout=10) assert good_status_codes == [200, 200]
[docs] def test_get_url_size_and_sizes(self): """ Test get_url_size and get_url_sizes functions. """ size = get_url_size(SMALL_FILE_URL, timeout=10) assert size is not None assert size > 1000 size_dynamic = get_url_size(REDIRECT_DEST_URL, timeout=10, verbose=True) if size_dynamic is not None: assert isinstance(size_dynamic, int) size_non_existent = get_url_size(NON_EXISTENT_URL, timeout=5) assert size_non_existent is None size_bad_domain = get_url_size(DEFINITELY_NON_EXISTENT_DOMAIN_URL, timeout=5) assert size_bad_domain is None urls_for_size = [SMALL_FILE_URL, NON_EXISTENT_URL, REDIRECT_DEST_URL] sizes_map = get_url_sizes(urls_for_size, n_workers=1, timeout=10) assert SMALL_FILE_URL in sizes_map assert sizes_map[SMALL_FILE_URL] == size assert NON_EXISTENT_URL in sizes_map assert sizes_map[NON_EXISTENT_URL] is None assert REDIRECT_DEST_URL in sizes_map assert sizes_map[REDIRECT_DEST_URL] == size_dynamic
def _test_url_utils(): """ Runs all tests in the TestUrlUtils class. I generally disable this during testing because it creates irritating nondeterminism (because it depends on downloading stuff from the Internet), and this is neither a core module nor a module that changes often. """ test_instance = TestUrlUtils() test_instance.set_up() try: test_instance.test_download_url_to_specified_file() test_instance.test_download_url_to_temp_file() test_instance.test_download_url_non_existent() test_instance.test_download_url_force_download() test_instance.test_download_url_escape_spaces() test_instance.test_download_relative_filename() test_instance.test_parallel_download_urls() test_instance.test_test_url_and_test_urls() test_instance.test_get_url_size_and_sizes() finally: test_instance.tear_down() # from IPython import embed; embed() # test_url_utils()