improve functions get_total_pages, get_response and lint check_filters, check_collapses and check_match_type
get_total_pages : default user agent is now DEFAULT_USER_AGENT and now instead of str formatting passing payload as param to full_url to generate the request url also get_response make the request instead of directly using requests.get() get_response : get_response is now not taking param as keyword arguments instead the invoker is supposed to pass the full url which may be generated by the full_url function therefore the return_full_url=False, is deprecated also. Also now closing the session via session.close() No need to check 'Exceeded 30 redirects' as save API uses a diffrent method. check_filters : Not assigning to variables the return of match groups beacause we wont be using them and the linter picks these unused assignments. check_collapses : Same reason as for check_filters but also removed a foolish test that checks equality with objects that are guaranteed to be same. check_match_type : Updated the text that of WaybackError
This commit is contained in:
parent
d1a1cf2546
commit
9262f5da21
@ -3,16 +3,16 @@ import requests
|
|||||||
from urllib3.util.retry import Retry
|
from urllib3.util.retry import Retry
|
||||||
from requests.adapters import HTTPAdapter
|
from requests.adapters import HTTPAdapter
|
||||||
from .exceptions import WaybackError
|
from .exceptions import WaybackError
|
||||||
|
from .utils import DEFAULT_USER_AGENT
|
||||||
|
|
||||||
|
|
||||||
def get_total_pages(url, user_agent):
|
def get_total_pages(url, user_agent=DEFAULT_USER_AGENT):
|
||||||
request_url = (
|
endpoint = "https://web.archive.org/cdx/search/cdx?"
|
||||||
"https://web.archive.org/cdx/search/cdx?url={url}&showNumPages=true".format(
|
payload = {"showNumPages": "true", "url": str(url)}
|
||||||
url=url
|
|
||||||
)
|
|
||||||
)
|
|
||||||
headers = {"User-Agent": user_agent}
|
headers = {"User-Agent": user_agent}
|
||||||
return int((requests.get(request_url, headers=headers).text).strip())
|
request_url = full_url(endpoint, params=payload)
|
||||||
|
response = get_response(request_url, headers=headers)
|
||||||
|
return int(response.text.strip())
|
||||||
|
|
||||||
|
|
||||||
def full_url(endpoint, params):
|
def full_url(endpoint, params):
|
||||||
@ -32,47 +32,29 @@ def full_url(endpoint, params):
|
|||||||
|
|
||||||
|
|
||||||
def get_response(
|
def get_response(
|
||||||
endpoint,
|
url,
|
||||||
params=None,
|
|
||||||
headers=None,
|
headers=None,
|
||||||
return_full_url=False,
|
|
||||||
retries=5,
|
retries=5,
|
||||||
backoff_factor=0.5,
|
backoff_factor=0.5,
|
||||||
no_raise_on_redirects=False,
|
no_raise_on_redirects=False,
|
||||||
):
|
):
|
||||||
|
session = requests.Session()
|
||||||
s = requests.Session()
|
|
||||||
|
|
||||||
retries = Retry(
|
retries = Retry(
|
||||||
total=retries,
|
total=retries,
|
||||||
backoff_factor=backoff_factor,
|
backoff_factor=backoff_factor,
|
||||||
status_forcelist=[500, 502, 503, 504],
|
status_forcelist=[500, 502, 503, 504],
|
||||||
)
|
)
|
||||||
|
session.mount("https://", HTTPAdapter(max_retries=retries))
|
||||||
s.mount("https://", HTTPAdapter(max_retries=retries))
|
|
||||||
|
|
||||||
# The URL with parameters required for the get request
|
|
||||||
url = full_url(endpoint, params)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
response = session.get(url, headers=headers)
|
||||||
if not return_full_url:
|
session.close()
|
||||||
return s.get(url, headers=headers)
|
return response
|
||||||
|
|
||||||
return (url, s.get(url, headers=headers))
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
||||||
reason = str(e)
|
reason = str(e)
|
||||||
|
|
||||||
if no_raise_on_redirects:
|
|
||||||
if "Exceeded 30 redirects" in reason:
|
|
||||||
return
|
|
||||||
|
|
||||||
exc_message = "Error while retrieving {url}.\n{reason}".format(
|
exc_message = "Error while retrieving {url}.\n{reason}".format(
|
||||||
url=url, reason=reason
|
url=url, reason=reason
|
||||||
)
|
)
|
||||||
|
|
||||||
exc = WaybackError(exc_message)
|
exc = WaybackError(exc_message)
|
||||||
exc.__cause__ = e
|
exc.__cause__ = e
|
||||||
raise exc
|
raise exc
|
||||||
@ -91,8 +73,8 @@ def check_filters(filters):
|
|||||||
_filter,
|
_filter,
|
||||||
)
|
)
|
||||||
|
|
||||||
key = match.group(1)
|
match.group(1)
|
||||||
val = match.group(2)
|
match.group(2)
|
||||||
|
|
||||||
except Exception:
|
except Exception:
|
||||||
|
|
||||||
@ -118,19 +100,9 @@ def check_collapses(collapses):
|
|||||||
r"(urlkey|timestamp|original|mimetype|statuscode|digest|length)(:?[0-9]{1,99})?",
|
r"(urlkey|timestamp|original|mimetype|statuscode|digest|length)(:?[0-9]{1,99})?",
|
||||||
collapse,
|
collapse,
|
||||||
)
|
)
|
||||||
field = match.group(1)
|
match.group(1)
|
||||||
|
|
||||||
N = None
|
|
||||||
if 2 == len(match.groups()):
|
if 2 == len(match.groups()):
|
||||||
N = match.group(2)
|
match.group(2)
|
||||||
|
|
||||||
if N:
|
|
||||||
if not (field + N == collapse):
|
|
||||||
raise Exception
|
|
||||||
else:
|
|
||||||
if not (field == collapse):
|
|
||||||
raise Exception
|
|
||||||
|
|
||||||
except Exception:
|
except Exception:
|
||||||
exc_message = "collapse argument '{collapse}' is not following the cdx collapse syntax.".format(
|
exc_message = "collapse argument '{collapse}' is not following the cdx collapse syntax.".format(
|
||||||
collapse=collapse
|
collapse=collapse
|
||||||
@ -143,7 +115,9 @@ def check_match_type(match_type, url):
|
|||||||
return
|
return
|
||||||
|
|
||||||
if "*" in url:
|
if "*" in url:
|
||||||
raise WaybackError("Can not use wildcard with match_type argument")
|
raise WaybackError(
|
||||||
|
"Can not use wildcard in the URL along with the match_type arguments."
|
||||||
|
)
|
||||||
|
|
||||||
legal_match_type = ["exact", "prefix", "host", "domain"]
|
legal_match_type = ["exact", "prefix", "host", "domain"]
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user