* fix: CI yml name

* add: mypy configuraion

* add: type annotation to waybackpy modules

* add: type annotation to test modules

* fix: mypy command

* add: types-requests to dev deps

* fix: disable max-line-length

* fix: move pytest.ini into setup.cfg

* add: urllib3 to deps

* fix: Retry (ref: https://github.com/python/typeshed/issues/6893)

* fix: f-string

* fix: shorten long lines

* add: staticmethod decorator to no-self-use methods

* fix: str(headers)->headers_str

* fix: error message

* fix: revert "str(headers)->headers_str" and ignore assignment CaseInsensitiveDict with str

* fix: mypy error
This commit is contained in:
eggplants
2022-02-05 03:23:36 +09:00
committed by GitHub
parent 320ef30371
commit d8cabdfdb5
22 changed files with 537 additions and 364 deletions

View File

@@ -1,4 +1,6 @@
import re
from typing import Any, Dict, List, Optional, Union
from urllib.parse import quote
import requests
from requests.adapters import HTTPAdapter
@@ -8,16 +10,19 @@ from .exceptions import WaybackError
from .utils import DEFAULT_USER_AGENT
def get_total_pages(url, user_agent=DEFAULT_USER_AGENT):
def get_total_pages(url: str, user_agent: str = DEFAULT_USER_AGENT) -> int:
endpoint = "https://web.archive.org/cdx/search/cdx?"
payload = {"showNumPages": "true", "url": str(url)}
headers = {"User-Agent": user_agent}
request_url = full_url(endpoint, params=payload)
response = get_response(request_url, headers=headers)
return int(response.text.strip())
if isinstance(response, requests.Response):
return int(response.text.strip())
else:
raise response
def full_url(endpoint, params):
def full_url(endpoint: str, params: Dict[str, Any]) -> str:
if not params:
return endpoint
full_url = endpoint if endpoint.endswith("?") else (endpoint + "?")
@@ -25,28 +30,25 @@ def full_url(endpoint, params):
key = "filter" if key.startswith("filter") else key
key = "collapse" if key.startswith("collapse") else key
amp = "" if full_url.endswith("?") else "&"
full_url = (
full_url
+ amp
+ "{key}={val}".format(key=key, val=requests.utils.quote(str(val)))
)
val = quote(str(val), safe="")
full_url += f"{amp}{key}={val}"
return full_url
def get_response(
url,
headers=None,
retries=5,
backoff_factor=0.5,
no_raise_on_redirects=False,
):
url: str,
headers: Optional[Dict[str, str]] = None,
retries: int = 5,
backoff_factor: float = 0.5,
# no_raise_on_redirects=False,
) -> Union[requests.Response, Exception]:
session = requests.Session()
retries = Retry(
retries_ = Retry(
total=retries,
backoff_factor=backoff_factor,
status_forcelist=[500, 502, 503, 504],
)
session.mount("https://", HTTPAdapter(max_retries=retries))
session.mount("https://", HTTPAdapter(max_retries=retries_))
try:
response = session.get(url, headers=headers)
@@ -54,77 +56,65 @@ def get_response(
return response
except Exception as e:
reason = str(e)
exc_message = "Error while retrieving {url}.\n{reason}".format(
url=url, reason=reason
)
exc_message = f"Error while retrieving {url}.\n{reason}"
exc = WaybackError(exc_message)
exc.__cause__ = e
raise exc
def check_filters(filters):
def check_filters(filters: List[str]) -> None:
if not isinstance(filters, list):
raise WaybackError("filters must be a list.")
# [!]field:regex
for _filter in filters:
try:
match = re.search(
r"(\!?(?:urlkey|timestamp|original|mimetype|statuscode|digest|length)):"
r"(.*)",
_filter,
)
match = re.search(
r"(\!?(?:urlkey|timestamp|original|mimetype|statuscode|digest|length)):(.*)",
_filter,
)
if match is None or len(match.groups()) != 2:
match.group(1)
match.group(2)
except Exception:
exc_message = (
"Filter '{_filter}' is not following the cdx filter syntax.".format(
_filter=_filter
)
)
exc_message = f"Filter '{_filter}' is not following the cdx filter syntax."
raise WaybackError(exc_message)
def check_collapses(collapses):
def check_collapses(collapses: List[str]) -> bool:
if not isinstance(collapses, list):
raise WaybackError("collapses must be a list.")
if len(collapses) == 0:
return
elif len(collapses) == 0:
return True
for collapse in collapses:
try:
match = re.search(
r"(urlkey|timestamp|original|mimetype|statuscode|digest|length)(:?[0-9]{1,99})?",
collapse,
)
match.group(1)
if 2 == len(match.groups()):
match.group(2)
except Exception:
exc_message = "collapse argument '{collapse}' is not following the cdx collapse syntax.".format(
collapse=collapse
match = re.search(
r"(urlkey|timestamp|original|mimetype|statuscode|digest|length)"
r"(:?[0-9]{1,99})?",
collapse,
)
if match is None or len(match.groups()) != 2:
exc_message = (
f"collapse argument '{collapse}' "
"is not following the cdx collapse syntax."
)
raise WaybackError(exc_message)
return True
def check_match_type(match_type, url):
def check_match_type(match_type: Optional[str], url: str) -> bool:
legal_match_type = ["exact", "prefix", "host", "domain"]
if not match_type:
return
if "*" in url:
return True
elif "*" in url:
raise WaybackError(
"Can not use wildcard in the URL along with the match_type arguments."
)
legal_match_type = ["exact", "prefix", "host", "domain"]
if match_type not in legal_match_type:
exc_message = "{match_type} is not an allowed match type.\nUse one from 'exact', 'prefix', 'host' or 'domain'".format(
match_type=match_type
elif match_type not in legal_match_type:
exc_message = (
f"{match_type} is not an allowed match type.\n"
"Use one from 'exact', 'prefix', 'host' or 'domain'"
)
raise WaybackError(exc_message)
else:
return True