""" Utility functions required for accessing the CDX server API. These are here in this module so that we don’t make any module too long. """ import re from typing import Any, Dict, List, Optional, Union from urllib.parse import quote import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from .exceptions import WaybackError from .utils import DEFAULT_USER_AGENT def get_total_pages(url: str, user_agent: str = DEFAULT_USER_AGENT) -> int: """ When using the pagination use adding showNumPages=true to the request URL makes the CDX server return an integer which is the number of pages of CDX pages available for us to query using the pagination API. """ endpoint = "https://web.archive.org/cdx/search/cdx?" payload = {"showNumPages": "true", "url": str(url)} headers = {"User-Agent": user_agent} request_url = full_url(endpoint, params=payload) response = get_response(request_url, headers=headers) if isinstance(response, requests.Response): return int(response.text.strip()) raise response def full_url(endpoint: str, params: Dict[str, Any]) -> str: """ As the function's name already implies that it returns full URL, but why we need a function for generating full URL? The CDX server can support multiple arguments for parameters such as filter and collapse and this function adds them without overwriting earlier added arguments. """ if not params: return endpoint _full_url = endpoint if endpoint.endswith("?") else (endpoint + "?") for key, val in params.items(): key = "filter" if key.startswith("filter") else key key = "collapse" if key.startswith("collapse") else key amp = "" if _full_url.endswith("?") else "&" val = quote(str(val), safe="") _full_url += f"{amp}{key}={val}" return _full_url def get_response( url: str, headers: Optional[Dict[str, str]] = None, retries: int = 5, backoff_factor: float = 0.5, ) -> Union[requests.Response, Exception]: """ Makes get request to the CDX server and returns the response. """ session = requests.Session() retries_ = Retry( total=retries, backoff_factor=backoff_factor, status_forcelist=[500, 502, 503, 504], ) session.mount("https://", HTTPAdapter(max_retries=retries_)) response = session.get(url, headers=headers) session.close() return response def check_filters(filters: List[str]) -> None: """ Check that the filter arguments passed by the end-user are valid. If not valid then raise WaybackError. """ if not isinstance(filters, list): raise WaybackError("filters must be a list.") # [!]field:regex for _filter in filters: match = re.search( r"(\!?(?:urlkey|timestamp|original|mimetype|statuscode|digest|length)):" r"(.*)", _filter, ) if match is None or len(match.groups()) != 2: exc_message = f"Filter '{_filter}' is not following the cdx filter syntax." raise WaybackError(exc_message) def check_collapses(collapses: List[str]) -> bool: """ Check that the collapse arguments passed by the end-user are valid. If not valid then raise WaybackError. """ if not isinstance(collapses, list): raise WaybackError("collapses must be a list.") if len(collapses) == 0: return True for collapse in collapses: match = re.search( r"(urlkey|timestamp|original|mimetype|statuscode|digest|length)" r"(:?[0-9]{1,99})?", collapse, ) if match is None or len(match.groups()) != 2: exc_message = ( f"collapse argument '{collapse}' " "is not following the cdx collapse syntax." ) raise WaybackError(exc_message) return True def check_match_type(match_type: Optional[str], url: str) -> bool: """ Check that the match_type argument passed by the end-user is valid. If not valid then raise WaybackError. """ legal_match_type = ["exact", "prefix", "host", "domain"] if not match_type: return True if "*" in url: raise WaybackError( "Can not use wildcard in the URL along with the match_type arguments." ) if match_type not in legal_match_type: exc_message = ( f"{match_type} is not an allowed match type.\n" "Use one from 'exact', 'prefix', 'host' or 'domain'" ) raise WaybackError(exc_message) return True