remove useless code and add docstrings and also lint using pylint.
This commit is contained in:
@@ -1,3 +1,10 @@
|
|||||||
|
"""
|
||||||
|
Utility functions required for accessing the CDX server API.
|
||||||
|
|
||||||
|
These are here in this module so that we don’t make any module too
|
||||||
|
big.
|
||||||
|
"""
|
||||||
|
|
||||||
import re
|
import re
|
||||||
from typing import Any, Dict, List, Optional, Union
|
from typing import Any, Dict, List, Optional, Union
|
||||||
from urllib.parse import quote
|
from urllib.parse import quote
|
||||||
@@ -11,28 +18,44 @@ from .utils import DEFAULT_USER_AGENT
|
|||||||
|
|
||||||
|
|
||||||
def get_total_pages(url: str, user_agent: str = DEFAULT_USER_AGENT) -> int:
|
def get_total_pages(url: str, user_agent: str = DEFAULT_USER_AGENT) -> int:
|
||||||
|
"""
|
||||||
|
When using the pagination use adding showNumPages=true to the request
|
||||||
|
URL makes the CDX server return an integer which is the number of pages
|
||||||
|
of CDX pages available for us to query using the pagination API.
|
||||||
|
"""
|
||||||
|
|
||||||
endpoint = "https://web.archive.org/cdx/search/cdx?"
|
endpoint = "https://web.archive.org/cdx/search/cdx?"
|
||||||
payload = {"showNumPages": "true", "url": str(url)}
|
payload = {"showNumPages": "true", "url": str(url)}
|
||||||
headers = {"User-Agent": user_agent}
|
headers = {"User-Agent": user_agent}
|
||||||
request_url = full_url(endpoint, params=payload)
|
request_url = full_url(endpoint, params=payload)
|
||||||
response = get_response(request_url, headers=headers)
|
response = get_response(request_url, headers=headers)
|
||||||
|
|
||||||
if isinstance(response, requests.Response):
|
if isinstance(response, requests.Response):
|
||||||
return int(response.text.strip())
|
return int(response.text.strip())
|
||||||
else:
|
raise response
|
||||||
raise response
|
|
||||||
|
|
||||||
|
|
||||||
def full_url(endpoint: str, params: Dict[str, Any]) -> str:
|
def full_url(endpoint: str, params: Dict[str, Any]) -> str:
|
||||||
|
"""
|
||||||
|
As the function's name already implies that it returns
|
||||||
|
full URL, but why we need a function for generating full URL?
|
||||||
|
The CDX server can support multiple arguments for parameters
|
||||||
|
such as filter and collapse and this function adds them without
|
||||||
|
overwriting earlier added arguments.
|
||||||
|
"""
|
||||||
|
|
||||||
if not params:
|
if not params:
|
||||||
return endpoint
|
return endpoint
|
||||||
full_url = endpoint if endpoint.endswith("?") else (endpoint + "?")
|
_full_url = endpoint if endpoint.endswith("?") else (endpoint + "?")
|
||||||
|
|
||||||
for key, val in params.items():
|
for key, val in params.items():
|
||||||
key = "filter" if key.startswith("filter") else key
|
key = "filter" if key.startswith("filter") else key
|
||||||
key = "collapse" if key.startswith("collapse") else key
|
key = "collapse" if key.startswith("collapse") else key
|
||||||
amp = "" if full_url.endswith("?") else "&"
|
amp = "" if _full_url.endswith("?") else "&"
|
||||||
val = quote(str(val), safe="")
|
val = quote(str(val), safe="")
|
||||||
full_url += f"{amp}{key}={val}"
|
_full_url += f"{amp}{key}={val}"
|
||||||
return full_url
|
|
||||||
|
return _full_url
|
||||||
|
|
||||||
|
|
||||||
def get_response(
|
def get_response(
|
||||||
@@ -40,29 +63,31 @@ def get_response(
|
|||||||
headers: Optional[Dict[str, str]] = None,
|
headers: Optional[Dict[str, str]] = None,
|
||||||
retries: int = 5,
|
retries: int = 5,
|
||||||
backoff_factor: float = 0.5,
|
backoff_factor: float = 0.5,
|
||||||
# no_raise_on_redirects=False,
|
|
||||||
) -> Union[requests.Response, Exception]:
|
) -> Union[requests.Response, Exception]:
|
||||||
|
"""
|
||||||
|
Make get request to the CDX server and return the response.
|
||||||
|
"""
|
||||||
|
|
||||||
session = requests.Session()
|
session = requests.Session()
|
||||||
|
|
||||||
retries_ = Retry(
|
retries_ = Retry(
|
||||||
total=retries,
|
total=retries,
|
||||||
backoff_factor=backoff_factor,
|
backoff_factor=backoff_factor,
|
||||||
status_forcelist=[500, 502, 503, 504],
|
status_forcelist=[500, 502, 503, 504],
|
||||||
)
|
)
|
||||||
session.mount("https://", HTTPAdapter(max_retries=retries_))
|
|
||||||
|
|
||||||
try:
|
session.mount("https://", HTTPAdapter(max_retries=retries_))
|
||||||
response = session.get(url, headers=headers)
|
response = session.get(url, headers=headers)
|
||||||
session.close()
|
session.close()
|
||||||
return response
|
return response
|
||||||
except Exception as e:
|
|
||||||
reason = str(e)
|
|
||||||
exc_message = f"Error while retrieving {url}.\n{reason}"
|
|
||||||
exc = WaybackError(exc_message)
|
|
||||||
exc.__cause__ = e
|
|
||||||
raise exc
|
|
||||||
|
|
||||||
|
|
||||||
def check_filters(filters: List[str]) -> None:
|
def check_filters(filters: List[str]) -> None:
|
||||||
|
"""
|
||||||
|
Check that the filter arguments passed by the end-user are valid.
|
||||||
|
If not valid then raise WaybackError.
|
||||||
|
"""
|
||||||
|
|
||||||
if not isinstance(filters, list):
|
if not isinstance(filters, list):
|
||||||
raise WaybackError("filters must be a list.")
|
raise WaybackError("filters must be a list.")
|
||||||
|
|
||||||
@@ -81,9 +106,15 @@ def check_filters(filters: List[str]) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def check_collapses(collapses: List[str]) -> bool:
|
def check_collapses(collapses: List[str]) -> bool:
|
||||||
|
"""
|
||||||
|
Check that the collapse arguments passed by the end-user are valid.
|
||||||
|
If not valid then raise WaybackError.
|
||||||
|
"""
|
||||||
|
|
||||||
if not isinstance(collapses, list):
|
if not isinstance(collapses, list):
|
||||||
raise WaybackError("collapses must be a list.")
|
raise WaybackError("collapses must be a list.")
|
||||||
elif len(collapses) == 0:
|
|
||||||
|
if len(collapses) == 0:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
for collapse in collapses:
|
for collapse in collapses:
|
||||||
@@ -103,18 +134,26 @@ def check_collapses(collapses: List[str]) -> bool:
|
|||||||
|
|
||||||
|
|
||||||
def check_match_type(match_type: Optional[str], url: str) -> bool:
|
def check_match_type(match_type: Optional[str], url: str) -> bool:
|
||||||
|
"""
|
||||||
|
Check that the match_type argument passed by the end-user is valid.
|
||||||
|
If not valid then raise WaybackError.
|
||||||
|
"""
|
||||||
|
|
||||||
legal_match_type = ["exact", "prefix", "host", "domain"]
|
legal_match_type = ["exact", "prefix", "host", "domain"]
|
||||||
|
|
||||||
if not match_type:
|
if not match_type:
|
||||||
return True
|
return True
|
||||||
elif "*" in url:
|
|
||||||
|
if "*" in url:
|
||||||
raise WaybackError(
|
raise WaybackError(
|
||||||
"Can not use wildcard in the URL along with the match_type arguments."
|
"Can not use wildcard in the URL along with the match_type arguments."
|
||||||
)
|
)
|
||||||
elif match_type not in legal_match_type:
|
|
||||||
|
if match_type not in legal_match_type:
|
||||||
exc_message = (
|
exc_message = (
|
||||||
f"{match_type} is not an allowed match type.\n"
|
f"{match_type} is not an allowed match type.\n"
|
||||||
"Use one from 'exact', 'prefix', 'host' or 'domain'"
|
"Use one from 'exact', 'prefix', 'host' or 'domain'"
|
||||||
)
|
)
|
||||||
raise WaybackError(exc_message)
|
raise WaybackError(exc_message)
|
||||||
else:
|
|
||||||
return True
|
return True
|
||||||
|
|||||||
Reference in New Issue
Block a user