added docstrings, added some static type hints and also lint. (#141)

* added docstrings, added some static type hints and also lint.

* added doc strings and changed some internal variable names for more clarity.

* make flake8 happy

* add descriptive docstrings and type hints in waybackpy/cdx_snapshot.py

* remove useless code and add docstrings and also lint using pylint.

* remove unwarented test

* added docstrings, lint using pylint and add a raise on 509 SC

* added docstrings and lint with pylint

* lint

* add doc strings and lint

* add docstrings and lint
This commit is contained in:
Akash Mahanty 2022-02-07 19:40:37 +05:30 committed by GitHub
parent 004ff26196
commit 97f8b96411
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 400 additions and 127 deletions

View File

@ -53,10 +53,6 @@ def test_get_response() -> None:
response = get_response(url, headers=headers) response = get_response(url, headers=headers)
assert not isinstance(response, Exception) and response.status_code == 200 assert not isinstance(response, Exception) and response.status_code == 200
url = "http/wwhfhfvhvjhmom"
with pytest.raises(WaybackError):
get_response(url, headers=headers)
def test_check_filters() -> None: def test_check_filters() -> None:
filters: List[str] = [] filters: List[str] = []

View File

@ -1,9 +1,32 @@
"""
This module interfaces the Wayback Machine's availability API.
The interface could be useful for looking up archives and finding archives
that are close to a specific date and time.
It has a class called WaybackMachineAvailabilityAPI, and the class has
methods such as:
near() for looking up archives close to a specific date and time.
oldest() for retrieving the first archive URL of the webpage.
newest() for retrieving the latest archive of an URL.
The Wayback Machine Availability response should be a valid JSON and
if it is not then an exception, InvalidJSONInAvailabilityAPIResponse is raised.
If the Availability API returned valid JSON but archive URL could not be found
it it then ArchiveNotInAvailabilityAPIResponse is raised.
"""
import json import json
import time import time
from datetime import datetime from datetime import datetime
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
import requests import requests
from requests.models import Response
from .exceptions import ( from .exceptions import (
ArchiveNotInAvailabilityAPIResponse, ArchiveNotInAvailabilityAPIResponse,
@ -22,38 +45,43 @@ class WaybackMachineAvailabilityAPI(object):
def __init__( def __init__(
self, url: str, user_agent: str = DEFAULT_USER_AGENT, max_tries: int = 3 self, url: str, user_agent: str = DEFAULT_USER_AGENT, max_tries: int = 3
) -> None: ) -> None:
self.url = str(url).strip().replace(" ", "%20") self.url = str(url).strip().replace(" ", "%20")
self.user_agent = user_agent self.user_agent = user_agent
self.headers: Dict[str, str] = {"User-Agent": self.user_agent} self.headers: Dict[str, str] = {"User-Agent": self.user_agent}
self.payload = {"url": self.url} self.payload: Dict[str, str] = {"url": self.url}
self.endpoint = "https://archive.org/wayback/available" self.endpoint: str = "https://archive.org/wayback/available"
self.max_tries = max_tries self.max_tries: int = max_tries
self.tries = 0 self.tries: int = 0
self.last_api_call_unix_time = int(time.time()) self.last_api_call_unix_time: int = int(time.time())
self.api_call_time_gap = 5 self.api_call_time_gap: int = 5
self.JSON: Optional[ResponseJSON] = None self.JSON: Optional[ResponseJSON] = None
@staticmethod @staticmethod
def unix_timestamp_to_wayback_timestamp(unix_timestamp: int) -> str: def unix_timestamp_to_wayback_timestamp(unix_timestamp: int) -> str:
""" """
Converts Unix time to wayback Machine timestamp. Converts Unix time to wayback Machine timestamp and the Wayback Machine
timestamp format is yyyyMMddhhmmss.
""" """
return datetime.utcfromtimestamp(int(unix_timestamp)).strftime("%Y%m%d%H%M%S") return datetime.utcfromtimestamp(int(unix_timestamp)).strftime("%Y%m%d%H%M%S")
def __repr__(self) -> str: def __repr__(self) -> str:
""" """
Same as string representation, just return the archive URL as a string. Same as string representation, just return the archive URL as a string.
""" """
return str(self) return str(self)
def __str__(self) -> str: def __str__(self) -> str:
""" """
String representation of the class. If atleast one API call was successfully String representation of the class. If atleast one API
made then return the archive URL as a string. Else returns None. call was successfully made then return the archive URL
as a string. Else returns "".
""" """
# String must not return anything other than a string object # String should not return anything other than a string object
# So, if some asks for string repr before making the API requests # So, if a string repr is asked for before making any API requests
# just return "" # just return ""
if not self.JSON: if not self.JSON:
return "" return ""
@ -62,26 +90,36 @@ class WaybackMachineAvailabilityAPI(object):
def json(self) -> Optional[ResponseJSON]: def json(self) -> Optional[ResponseJSON]:
""" """
Makes the API call to the availability API can set the JSON response Makes the API call to the availability API and set the JSON response
to the JSON attribute of the instance and also returns the JSON attribute. to the JSON attribute of the instance and also returns the JSON
attribute.
time_diff and sleep_time makes sure that you are not making too many
requests in a short interval of item, making too many requests is bad
as Wayback Machine may reject them above a certain threshold.
The end-user can change the api_call_time_gap attribute of the instance
to increase or decrease the default time gap between two successive API
calls, but it is not recommended to increase it.
""" """
time_diff = int(time.time()) - self.last_api_call_unix_time time_diff = int(time.time()) - self.last_api_call_unix_time
sleep_time = self.api_call_time_gap - time_diff sleep_time = self.api_call_time_gap - time_diff
if sleep_time > 0: if sleep_time > 0:
time.sleep(sleep_time) time.sleep(sleep_time)
self.response = requests.get( self.response: Response = requests.get(
self.endpoint, params=self.payload, headers=self.headers self.endpoint, params=self.payload, headers=self.headers
) )
self.last_api_call_unix_time = int(time.time()) self.last_api_call_unix_time = int(time.time())
self.tries += 1 self.tries += 1
try: try:
self.JSON = self.response.json() self.JSON = self.response.json()
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError as json_decode_error:
raise InvalidJSONInAvailabilityAPIResponse( raise InvalidJSONInAvailabilityAPIResponse(
f"Response data:\n{self.response.text}" f"Response data:\n{self.response.text}"
) ) from json_decode_error
return self.JSON return self.JSON
@ -91,15 +129,17 @@ class WaybackMachineAvailabilityAPI(object):
If JSON attribute of the instance is None it implies that the either If JSON attribute of the instance is None it implies that the either
the the last API call failed or one was never made. the the last API call failed or one was never made.
If not JSON or if JSON but no timestamp in the JSON response then returns If not JSON or if JSON but no timestamp in the JSON response then
the maximum value for datetime object that is possible. returns the maximum value for datetime object that is possible.
If you get an URL as a response form the availability API it is guaranteed If you get an URL as a response form the availability API it is
that you can get the datetime object from the timestamp. guaranteed that you can get the datetime object from the timestamp.
""" """
if self.JSON is None or "archived_snapshots" not in self.JSON: if self.JSON is None or "archived_snapshots" not in self.JSON:
return datetime.max return datetime.max
elif (
if (
self.JSON is not None self.JSON is not None
and "archived_snapshots" in self.JSON and "archived_snapshots" in self.JSON
and self.JSON["archived_snapshots"] is not None and self.JSON["archived_snapshots"] is not None
@ -110,21 +150,23 @@ class WaybackMachineAvailabilityAPI(object):
return datetime.strptime( return datetime.strptime(
self.JSON["archived_snapshots"]["closest"]["timestamp"], "%Y%m%d%H%M%S" self.JSON["archived_snapshots"]["closest"]["timestamp"], "%Y%m%d%H%M%S"
) )
else:
raise ValueError("Could not get timestamp from result") raise ValueError("Could not get timestamp from result")
@property @property
def archive_url(self) -> str: def archive_url(self) -> str:
""" """
Reads the the JSON response data and tries to get the timestamp and returns Reads the the JSON response data and returns
the timestamp if found else returns None. the timestamp if found and if not found raises
ArchiveNotInAvailabilityAPIResponse.
""" """
archive_url = "" archive_url = ""
data = self.JSON data = self.JSON
# If the user didn't used oldest, newest or near but tries to access the # If the user didn't invoke oldest, newest or near but tries to access the
# archive_url attribute then, we assume they are fine with any archive # archive_url attribute then assume they are fine with any archive
# and invoke the oldest archive function. # and invoke the oldest method.
if not data: if not data:
self.oldest() self.oldest()
@ -137,7 +179,7 @@ class WaybackMachineAvailabilityAPI(object):
self.json() # It makes a new API call self.json() # It makes a new API call
data = self.JSON # json() updated the value of JSON attribute data = self.JSON # json() updated the value of JSON attribute
# Even if after we exhausted teh max_tries, then we give up and # If we exhausted the max_tries, then we give up and
# raise exception. # raise exception.
if not data or not data["archived_snapshots"]: if not data or not data["archived_snapshots"]:
@ -160,6 +202,7 @@ class WaybackMachineAvailabilityAPI(object):
Prepends zero before the year, month, day, hour and minute so that they Prepends zero before the year, month, day, hour and minute so that they
are conformable with the YYYYMMDDhhmmss wayback machine timestamp format. are conformable with the YYYYMMDDhhmmss wayback machine timestamp format.
""" """
return "".join( return "".join(
str(kwargs[key]).zfill(2) str(kwargs[key]).zfill(2)
for key in ["year", "month", "day", "hour", "minute"] for key in ["year", "month", "day", "hour", "minute"]
@ -167,18 +210,21 @@ class WaybackMachineAvailabilityAPI(object):
def oldest(self) -> "WaybackMachineAvailabilityAPI": def oldest(self) -> "WaybackMachineAvailabilityAPI":
""" """
Passing the year 1994 should return the oldest archive because Passes the date 1994-01-01 to near which should return the oldest archive
wayback machine was started in May, 1996 and there should be no archive because Wayback Machine was started in May, 1996 and it is assumed that
before the year 1994. there would be no archive older than January 1, 1994.
""" """
return self.near(year=1994)
return self.near(year=1994, month=1, day=1)
def newest(self) -> "WaybackMachineAvailabilityAPI": def newest(self) -> "WaybackMachineAvailabilityAPI":
""" """
Passing the current UNIX time should be sufficient to get the newest Passes the current UNIX time to near() for retrieving the newest archive
archive considering the API request-response time delay and also the from the availability API.
database lags on Wayback machine.
We assume that wayback machine can not archive the future of a webpage.
""" """
return self.near(unix_timestamp=int(time.time())) return self.near(unix_timestamp=int(time.time()))
def near( def near(
@ -191,16 +237,18 @@ class WaybackMachineAvailabilityAPI(object):
unix_timestamp: Optional[int] = None, unix_timestamp: Optional[int] = None,
) -> "WaybackMachineAvailabilityAPI": ) -> "WaybackMachineAvailabilityAPI":
""" """
The main method for this Class, oldest and newest methods are dependent on this The main method for the Class, oldest() and newest() are dependent on it.
method.
It generates the timestamp based on the input either by calling the It generates the timestamp based on the input either by calling the
unix_timestamp_to_wayback_timestamp or wayback_timestamp method with unix_timestamp_to_wayback_timestamp or wayback_timestamp method with
appropriate arguments for their respective parameters. appropriate arguments for their respective parameters.
Adds the timestamp to the payload dictionary. Adds the timestamp to the payload dictionary.
And finally invoking the json method to make the API call then returns And finally invoking the json method to make the API call then returns
the instance. the instance.
""" """
if unix_timestamp: if unix_timestamp:
timestamp = self.unix_timestamp_to_wayback_timestamp(unix_timestamp) timestamp = self.unix_timestamp_to_wayback_timestamp(unix_timestamp)
else: else:

View File

@ -1,3 +1,14 @@
"""
This module interfaces the Wayback Machine's CDX server API.
The module has WaybackMachineCDXServerAPI which should be used by the users of
this module to consume the CDX server API.
WaybackMachineCDXServerAPI has a snapshot method that yields the snapshots, and
the snapshots are yielded as instances of the CDXSnapshot class.
"""
from typing import Dict, Generator, List, Optional, cast from typing import Dict, Generator, List, Optional, cast
from .cdx_snapshot import CDXSnapshot from .cdx_snapshot import CDXSnapshot
@ -16,6 +27,11 @@ from .utils import DEFAULT_USER_AGENT
class WaybackMachineCDXServerAPI(object): class WaybackMachineCDXServerAPI(object):
""" """
Class that interfaces the CDX server API of the Wayback Machine. Class that interfaces the CDX server API of the Wayback Machine.
snapshot() returns a generator that can be iterated upon by the end-user,
the generator returns the snapshots/entries as instance of CDXSnapshot to
make the usage easy, just use '.' to get any attribute as the attributes are
accessible via a dot ".".
""" """
# start_timestamp: from, can not use from as it's a keyword # start_timestamp: from, can not use from as it's a keyword
@ -53,9 +69,35 @@ class WaybackMachineCDXServerAPI(object):
def cdx_api_manager( def cdx_api_manager(
self, payload: Dict[str, str], headers: Dict[str, str], use_page: bool = False self, payload: Dict[str, str], headers: Dict[str, str], use_page: bool = False
) -> Generator[str, None, None]: ) -> Generator[str, None, None]:
"""
Manages the API calls for the instance, it automatically selects the best
parameters by looking as the query of the end-user. For bigger queries
automatically use the CDX pagination API and for smaller queries use the
normal API.
CDX Server API is a complex API and to make it easy for the end user to
consume it the CDX manager(this method) handles the selection of the
API output, whether to use the pagination API or not.
For doing large/bulk queries, the use of the Pagination API is
recommended by the Wayback Machine authors. And it determines if the
query would be large or not by using the showNumPages=true parameter,
this tells the number of pages of CDX DATA that the pagination API
will return.
If the number of page is less than 2 we use the normal non-pagination
API as the pagination API is known to lag and for big queries it should
not matter but for queries where the number of pages are less this
method chooses accuracy over the pagination API.
"""
# number of pages that will returned by the pagination API.
# get_total_pages adds the showNumPages=true param to pagination API
# requests.
# This is a special query that will return a single number indicating
# the number of pages.
total_pages = get_total_pages(self.url, self.user_agent) total_pages = get_total_pages(self.url, self.user_agent)
# If we only have two or less pages of archives then we care for more accuracy
# pagination API is lagged sometimes
if use_page is True and total_pages >= 2: if use_page is True and total_pages >= 2:
blank_pages = 0 blank_pages = 0
for i in range(total_pages): for i in range(total_pages):
@ -78,11 +120,11 @@ class WaybackMachineCDXServerAPI(object):
else: else:
payload["showResumeKey"] = "true" payload["showResumeKey"] = "true"
payload["limit"] = str(self.limit) payload["limit"] = str(self.limit)
resumeKey = None resume_key = None
more = True more = True
while more: while more:
if resumeKey: if resume_key:
payload["resumeKey"] = resumeKey payload["resumeKey"] = resume_key
url = full_url(self.endpoint, params=payload) url = full_url(self.endpoint, params=payload)
res = get_response(url, headers=headers) res = get_response(url, headers=headers)
@ -102,13 +144,16 @@ class WaybackMachineCDXServerAPI(object):
if len(second_last_line) == 0: if len(second_last_line) == 0:
resumeKey = lines[-1].strip() resume_key = lines[-1].strip()
text = text.replace(resumeKey, "", 1).strip() text = text.replace(resume_key, "", 1).strip()
more = True more = True
yield text yield text
def add_payload(self, payload: Dict[str, str]) -> None: def add_payload(self, payload: Dict[str, str]) -> None:
"""
Adds the payload to the payload dictionary.
"""
if self.start_timestamp: if self.start_timestamp:
payload["from"] = self.start_timestamp payload["from"] = self.start_timestamp
@ -122,17 +167,35 @@ class WaybackMachineCDXServerAPI(object):
payload["matchType"] = self.match_type payload["matchType"] = self.match_type
if self.filters and len(self.filters) > 0: if self.filters and len(self.filters) > 0:
for i, f in enumerate(self.filters): for i, _filter in enumerate(self.filters):
payload["filter" + str(i)] = f payload["filter" + str(i)] = _filter
if self.collapses and len(self.collapses) > 0: if self.collapses and len(self.collapses) > 0:
for i, f in enumerate(self.collapses): for i, collapse in enumerate(self.collapses):
payload["collapse" + str(i)] = f payload["collapse" + str(i)] = collapse
# Don't need to return anything as it's dictionary. # Don't need to return anything as it's dictionary.
payload["url"] = self.url payload["url"] = self.url
def snapshots(self) -> Generator[CDXSnapshot, None, None]: def snapshots(self) -> Generator[CDXSnapshot, None, None]:
"""
This function yields the CDX data lines as snapshots.
As it is a generator it exhaustible, the reason that this is
a generator and not a list are:
a) CDX server API can return millions of entries for a query and list
is not suitable for such cases.
b) Preventing memory usage issues, as told before this method may yield
millions of records for some queries and your system may not have enough
memory for such a big list. Also Remember this if outputing to Jupyter
Notebooks.
The objects yielded by this method are instance of CDXSnapshot class,
you can access the attributes of the entries as the attribute of the instance
itself.
"""
payload: Dict[str, str] = {} payload: Dict[str, str] = {}
headers = {"User-Agent": self.user_agent} headers = {"User-Agent": self.user_agent}
@ -144,18 +207,25 @@ class WaybackMachineCDXServerAPI(object):
if self.collapses != []: if self.collapses != []:
self.use_page = False self.use_page = False
texts = self.cdx_api_manager(payload, headers, use_page=self.use_page) entries = self.cdx_api_manager(payload, headers, use_page=self.use_page)
for text in texts: for entry in entries:
if text.isspace() or len(text) <= 1 or not text: if entry.isspace() or len(entry) <= 1 or not entry:
continue continue
snapshot_list = text.split("\n") # each line is a snapshot aka entry of the CDX server API.
# We are able to split the page by lines because it only
# splits the lines on a sinlge page and not all the entries
# at once, thus there should be no issues of too much memory usage.
snapshot_list = entry.split("\n")
for snapshot in snapshot_list: for snapshot in snapshot_list:
if len(snapshot) < 46: # 14 + 32 (timestamp+digest) # 14 + 32 == 46 ( timestamp + digest ), ignore the invalid entries.
# they are invalid if their length is smaller than sum of length
# of a standard wayback_timestamp and standard digest of an entry.
if len(snapshot) < 46:
continue continue
properties: Dict[str, Optional[str]] = { properties: Dict[str, Optional[str]] = {
@ -168,16 +238,16 @@ class WaybackMachineCDXServerAPI(object):
"length": None, "length": None,
} }
prop_values = snapshot.split(" ") property_value = snapshot.split(" ")
prop_values_len = len(prop_values) total_property_values = len(property_value)
properties_len = len(properties) warranted_total_property_values = len(properties)
if prop_values_len != properties_len: if total_property_values != warranted_total_property_values:
raise WaybackError( raise WaybackError(
f"Snapshot returned by Cdx API has {prop_values_len} " f"Snapshot returned by CDX API has {total_property_values} prop"
f"properties instead of expected {properties_len} properties.\n" f"erties instead of expected {warranted_total_property_values} "
f"Problematic Snapshot: {snapshot}" f"properties.\nProblematic Snapshot: {snapshot}"
) )
( (
@ -188,6 +258,6 @@ class WaybackMachineCDXServerAPI(object):
properties["statuscode"], properties["statuscode"],
properties["digest"], properties["digest"],
properties["length"], properties["length"],
) = prop_values ) = property_value
yield CDXSnapshot(cast(Dict[str, str], properties)) yield CDXSnapshot(cast(Dict[str, str], properties))

View File

@ -1,30 +1,83 @@
"""
Module that contains the CDXSnapshot class, CDX records are casted
to CDXSnapshot objects for easier access.
The CDX index format is plain text data. Each line ('record') indicates a
crawled document. And these lines are casted to CDXSnapshot.
"""
from datetime import datetime from datetime import datetime
from typing import Dict from typing import Dict
class CDXSnapshot(object): class CDXSnapshot(object):
""" """
Class for the CDX snapshot lines returned by the CDX API, Class for the CDX snapshot lines('record') returned by the CDX API,
Each valid line of the CDX API is casted to an CDXSnapshot object Each valid line of the CDX API is casted to an CDXSnapshot object
by the CDX API interface. by the CDX API interface, just use "." to access any attribute of the
CDX server API snapshot.
This provides the end-user the ease of using the data as attributes This provides the end-user the ease of using the data as attributes
of the CDXSnapshot. of the CDXSnapshot.
The string representation of the class is identical to the line returned
by the CDX server API.
Besides all the attributes of the CDX server API this class also provides
archive_url attribute, yes it is the archive url of the snapshot.
Attributes of the this class and what they represents and are useful for:
urlkey: The document captured, expressed as a SURT
SURT stands for Sort-friendly URI Reordering Transform, and is a
transformation applied to URIs which makes their left-to-right
representation better match the natural hierarchy of domain names.
A URI <scheme://domain.tld/path?query> has SURT
form <scheme://(tld,domain,)/path?query>.
timestamp: The timestamp of the archive, format is yyyyMMddhhmmss and type
is string.
datetime_timestamp: The timestamp as a datetime object.
original: The original URL of the archive. If archive_url is
https://web.archive.org/web/20220113130051/https://google.com then the
original URL is https://google.com
mimetype: The documents file type. e.g. text/html
statuscode: HTTP response code for the document at the time of its crawling
digest: Base32-encoded SHA-1 checksum of the document for discriminating
with others
length: Documents volume of bytes in the WARC file
archive_url: The archive url of the snapshot, this is not returned by the
CDX server API but created by this class on init.
""" """
def __init__(self, properties: Dict[str, str]) -> None: def __init__(self, properties: Dict[str, str]) -> None:
self.urlkey = properties["urlkey"] self.urlkey: str = properties["urlkey"]
self.timestamp = properties["timestamp"] self.timestamp: str = properties["timestamp"]
self.datetime_timestamp = datetime.strptime(self.timestamp, "%Y%m%d%H%M%S") self.datetime_timestamp: datetime = datetime.strptime(
self.original = properties["original"] self.timestamp, "%Y%m%d%H%M%S"
self.mimetype = properties["mimetype"] )
self.statuscode = properties["statuscode"] self.original: str = properties["original"]
self.digest = properties["digest"] self.mimetype: str = properties["mimetype"]
self.length = properties["length"] self.statuscode: str = properties["statuscode"]
self.archive_url = ( self.digest: str = properties["digest"]
self.length: str = properties["length"]
self.archive_url: str = (
f"https://web.archive.org/web/{self.timestamp}/{self.original}" f"https://web.archive.org/web/{self.timestamp}/{self.original}"
) )
def __str__(self) -> str: def __str__(self) -> str:
"""
The string representation is same as the line returned by the
CDX server API for the snapshot.
"""
return ( return (
f"{self.urlkey} {self.timestamp} {self.original} " f"{self.urlkey} {self.timestamp} {self.original} "
f"{self.mimetype} {self.statuscode} {self.digest} {self.length}" f"{self.mimetype} {self.statuscode} {self.digest} {self.length}"

View File

@ -1,3 +1,10 @@
"""
Utility functions required for accessing the CDX server API.
These are here in this module so that we dont make any module too
big.
"""
import re import re
from typing import Any, Dict, List, Optional, Union from typing import Any, Dict, List, Optional, Union
from urllib.parse import quote from urllib.parse import quote
@ -11,28 +18,44 @@ from .utils import DEFAULT_USER_AGENT
def get_total_pages(url: str, user_agent: str = DEFAULT_USER_AGENT) -> int: def get_total_pages(url: str, user_agent: str = DEFAULT_USER_AGENT) -> int:
"""
When using the pagination use adding showNumPages=true to the request
URL makes the CDX server return an integer which is the number of pages
of CDX pages available for us to query using the pagination API.
"""
endpoint = "https://web.archive.org/cdx/search/cdx?" endpoint = "https://web.archive.org/cdx/search/cdx?"
payload = {"showNumPages": "true", "url": str(url)} payload = {"showNumPages": "true", "url": str(url)}
headers = {"User-Agent": user_agent} headers = {"User-Agent": user_agent}
request_url = full_url(endpoint, params=payload) request_url = full_url(endpoint, params=payload)
response = get_response(request_url, headers=headers) response = get_response(request_url, headers=headers)
if isinstance(response, requests.Response): if isinstance(response, requests.Response):
return int(response.text.strip()) return int(response.text.strip())
else: raise response
raise response
def full_url(endpoint: str, params: Dict[str, Any]) -> str: def full_url(endpoint: str, params: Dict[str, Any]) -> str:
"""
As the function's name already implies that it returns
full URL, but why we need a function for generating full URL?
The CDX server can support multiple arguments for parameters
such as filter and collapse and this function adds them without
overwriting earlier added arguments.
"""
if not params: if not params:
return endpoint return endpoint
full_url = endpoint if endpoint.endswith("?") else (endpoint + "?") _full_url = endpoint if endpoint.endswith("?") else (endpoint + "?")
for key, val in params.items(): for key, val in params.items():
key = "filter" if key.startswith("filter") else key key = "filter" if key.startswith("filter") else key
key = "collapse" if key.startswith("collapse") else key key = "collapse" if key.startswith("collapse") else key
amp = "" if full_url.endswith("?") else "&" amp = "" if _full_url.endswith("?") else "&"
val = quote(str(val), safe="") val = quote(str(val), safe="")
full_url += f"{amp}{key}={val}" _full_url += f"{amp}{key}={val}"
return full_url
return _full_url
def get_response( def get_response(
@ -40,29 +63,31 @@ def get_response(
headers: Optional[Dict[str, str]] = None, headers: Optional[Dict[str, str]] = None,
retries: int = 5, retries: int = 5,
backoff_factor: float = 0.5, backoff_factor: float = 0.5,
# no_raise_on_redirects=False,
) -> Union[requests.Response, Exception]: ) -> Union[requests.Response, Exception]:
"""
Make get request to the CDX server and return the response.
"""
session = requests.Session() session = requests.Session()
retries_ = Retry( retries_ = Retry(
total=retries, total=retries,
backoff_factor=backoff_factor, backoff_factor=backoff_factor,
status_forcelist=[500, 502, 503, 504], status_forcelist=[500, 502, 503, 504],
) )
session.mount("https://", HTTPAdapter(max_retries=retries_))
try: session.mount("https://", HTTPAdapter(max_retries=retries_))
response = session.get(url, headers=headers) response = session.get(url, headers=headers)
session.close() session.close()
return response return response
except Exception as e:
reason = str(e)
exc_message = f"Error while retrieving {url}.\n{reason}"
exc = WaybackError(exc_message)
exc.__cause__ = e
raise exc
def check_filters(filters: List[str]) -> None: def check_filters(filters: List[str]) -> None:
"""
Check that the filter arguments passed by the end-user are valid.
If not valid then raise WaybackError.
"""
if not isinstance(filters, list): if not isinstance(filters, list):
raise WaybackError("filters must be a list.") raise WaybackError("filters must be a list.")
@ -81,9 +106,15 @@ def check_filters(filters: List[str]) -> None:
def check_collapses(collapses: List[str]) -> bool: def check_collapses(collapses: List[str]) -> bool:
"""
Check that the collapse arguments passed by the end-user are valid.
If not valid then raise WaybackError.
"""
if not isinstance(collapses, list): if not isinstance(collapses, list):
raise WaybackError("collapses must be a list.") raise WaybackError("collapses must be a list.")
elif len(collapses) == 0:
if len(collapses) == 0:
return True return True
for collapse in collapses: for collapse in collapses:
@ -103,18 +134,26 @@ def check_collapses(collapses: List[str]) -> bool:
def check_match_type(match_type: Optional[str], url: str) -> bool: def check_match_type(match_type: Optional[str], url: str) -> bool:
"""
Check that the match_type argument passed by the end-user is valid.
If not valid then raise WaybackError.
"""
legal_match_type = ["exact", "prefix", "host", "domain"] legal_match_type = ["exact", "prefix", "host", "domain"]
if not match_type: if not match_type:
return True return True
elif "*" in url:
if "*" in url:
raise WaybackError( raise WaybackError(
"Can not use wildcard in the URL along with the match_type arguments." "Can not use wildcard in the URL along with the match_type arguments."
) )
elif match_type not in legal_match_type:
if match_type not in legal_match_type:
exc_message = ( exc_message = (
f"{match_type} is not an allowed match type.\n" f"{match_type} is not an allowed match type.\n"
"Use one from 'exact', 'prefix', 'host' or 'domain'" "Use one from 'exact', 'prefix', 'host' or 'domain'"
) )
raise WaybackError(exc_message) raise WaybackError(exc_message)
else:
return True return True

View File

@ -1,3 +1,7 @@
"""
Module that makes waybackpy a CLI tool.
"""
import json as JSON import json as JSON
import os import os
import random import random
@ -19,7 +23,10 @@ from .wrapper import Url
def echo_availability_api( def echo_availability_api(
availability_api_instance: WaybackMachineAvailabilityAPI, json: bool availability_api_instance: WaybackMachineAvailabilityAPI, json: bool
) -> None: ) -> None:
click.echo("Archive URL:") """
Output availability API depending functions.
Near, oldest and newest output by this method.
"""
if not availability_api_instance.archive_url: if not availability_api_instance.archive_url:
archive_url = ( archive_url = (
"NO ARCHIVE FOUND - The requested URL is probably " "NO ARCHIVE FOUND - The requested URL is probably "
@ -29,6 +36,7 @@ def echo_availability_api(
) )
else: else:
archive_url = availability_api_instance.archive_url archive_url = availability_api_instance.archive_url
click.echo("Archive URL:")
click.echo(archive_url) click.echo(archive_url)
if json: if json:
click.echo("JSON response:") click.echo("JSON response:")
@ -36,6 +44,10 @@ def echo_availability_api(
def save_urls_on_file(url_gen: Generator[str, None, None]) -> None: def save_urls_on_file(url_gen: Generator[str, None, None]) -> None:
"""
Save output of CDX API on file.
Mainly here because of backwards compatibility.
"""
domain = None domain = None
sys_random = random.SystemRandom() sys_random = random.SystemRandom()
uid = "".join( uid = "".join(
@ -51,8 +63,8 @@ def save_urls_on_file(url_gen: Generator[str, None, None]) -> None:
domain = "domain-unknown" if match is None else match.group(1) domain = "domain-unknown" if match is None else match.group(1)
file_name = f"{domain}-urls-{uid}.txt" file_name = f"{domain}-urls-{uid}.txt"
file_path = os.path.join(os.getcwd(), file_name) file_path = os.path.join(os.getcwd(), file_name)
with open(file_path, "a") as f: with open(file_path, "a") as file:
f.write(f"{url}\n") file.write(f"{url}\n")
click.echo(url) click.echo(url)
@ -269,6 +281,7 @@ def main( # pylint: disable=no-value-for-parameter
""" """
if version: if version:
click.echo(f"waybackpy version {__version__}") click.echo(f"waybackpy version {__version__}")
elif show_license: elif show_license:
click.echo( click.echo(
requests.get( requests.get(
@ -277,6 +290,7 @@ def main( # pylint: disable=no-value-for-parameter
) )
elif url is None: elif url is None:
click.echo("No URL detected. Please provide an URL.", err=True) click.echo("No URL detected. Please provide an URL.", err=True)
elif ( elif (
not version not version
and not oldest and not oldest
@ -291,14 +305,17 @@ def main( # pylint: disable=no-value-for-parameter
"Use --help flag for help using waybackpy.", "Use --help flag for help using waybackpy.",
err=True, err=True,
) )
elif oldest: elif oldest:
availability_api = WaybackMachineAvailabilityAPI(url, user_agent=user_agent) availability_api = WaybackMachineAvailabilityAPI(url, user_agent=user_agent)
availability_api.oldest() availability_api.oldest()
echo_availability_api(availability_api, json) echo_availability_api(availability_api, json)
elif newest: elif newest:
availability_api = WaybackMachineAvailabilityAPI(url, user_agent=user_agent) availability_api = WaybackMachineAvailabilityAPI(url, user_agent=user_agent)
availability_api.newest() availability_api.newest()
echo_availability_api(availability_api, json) echo_availability_api(availability_api, json)
elif near: elif near:
availability_api = WaybackMachineAvailabilityAPI(url, user_agent=user_agent) availability_api = WaybackMachineAvailabilityAPI(url, user_agent=user_agent)
near_args = {} near_args = {}
@ -309,6 +326,7 @@ def main( # pylint: disable=no-value-for-parameter
near_args[key] = arg near_args[key] = arg
availability_api.near(**near_args) availability_api.near(**near_args)
echo_availability_api(availability_api, json) echo_availability_api(availability_api, json)
elif save: elif save:
save_api = WaybackMachineSaveAPI(url, user_agent=user_agent) save_api = WaybackMachineSaveAPI(url, user_agent=user_agent)
save_api.save() save_api.save()
@ -319,15 +337,17 @@ def main( # pylint: disable=no-value-for-parameter
if headers: if headers:
click.echo("Save API headers:") click.echo("Save API headers:")
click.echo(save_api.headers) click.echo(save_api.headers)
elif known_urls: elif known_urls:
wayback = Url(url, user_agent) wayback = Url(url, user_agent)
url_gen = wayback.known_urls(subdomain=subdomain) url_gen = wayback.known_urls(subdomain=subdomain)
if file: if file:
return save_urls_on_file(url_gen) return save_urls_on_file(url_gen)
else:
for url in url_gen: for url in url_gen:
click.echo(url) click.echo(url)
elif cdx: elif cdx:
filters = list(cdx_filter) filters = list(cdx_filter)
collapses = list(collapse) collapses = list(collapse)

View File

@ -1,3 +1,10 @@
"""
This module interfaces the Wayback Machine's SavePageNow (SPN) API.
The module has WaybackMachineSaveAPI class which should be used by the users of
this module to use the SavePageNow API.
"""
import re import re
import time import time
from datetime import datetime from datetime import datetime
@ -8,7 +15,7 @@ from requests.adapters import HTTPAdapter
from requests.structures import CaseInsensitiveDict from requests.structures import CaseInsensitiveDict
from urllib3.util.retry import Retry from urllib3.util.retry import Retry
from .exceptions import MaximumSaveRetriesExceeded, TooManyRequestsError from .exceptions import MaximumSaveRetriesExceeded, TooManyRequestsError, WaybackError
from .utils import DEFAULT_USER_AGENT from .utils import DEFAULT_USER_AGENT
@ -47,8 +54,8 @@ class WaybackMachineSaveAPI(object):
if self._archive_url: if self._archive_url:
return self._archive_url return self._archive_url
else:
return self.save() return self.save()
def get_save_request_headers(self) -> None: def get_save_request_headers(self) -> None:
""" """
@ -66,6 +73,7 @@ class WaybackMachineSaveAPI(object):
to be very unreliable thus if it fails first check opening to be very unreliable thus if it fails first check opening
the response URL yourself in the browser. the response URL yourself in the browser.
""" """
session = requests.Session() session = requests.Session()
retries = Retry( retries = Retry(
total=self.total_save_retries, total=self.total_save_retries,
@ -79,11 +87,24 @@ class WaybackMachineSaveAPI(object):
self.status_code = self.response.status_code self.status_code = self.response.status_code
self.response_url = self.response.url self.response_url = self.response.url
session.close() session.close()
if self.status_code == 429: if self.status_code == 429:
# why wait 5 minutes and 429?
# see https://github.com/akamhy/waybackpy/issues/97
raise TooManyRequestsError( raise TooManyRequestsError(
"Seem to be refused to request by the server. " f"Can not save '{self.url}'. "
"Save Page Now receives up to 15 URLs per minutes. " f"Save request refused by the server. "
"Wait a moment and run again." f"Save Page Now limits saving 15 URLs per minutes. "
f"Try waiting for 5 minutes and then try again."
)
# why 509?
# see https://github.com/akamhy/waybackpy/pull/99
# also https://t.co/xww4YJ0Iwc
if self.status_code == 509:
raise WaybackError(
f"Can not save '{self.url}'. You have probably reached the "
f"limit of active sessions."
) )
def archive_url_parser(self) -> Optional[str]: def archive_url_parser(self) -> Optional[str]:
@ -146,13 +167,17 @@ class WaybackMachineSaveAPI(object):
the Wayback Machine to serve cached archive if last archive was captured the Wayback Machine to serve cached archive if last archive was captured
before last 45 minutes. before last 45 minutes.
""" """
regex = r"https?://web\.archive.org/web/([0-9]{14})/http"
m = re.search(regex, str(self._archive_url))
if m is None or len(m.groups()) != 1:
raise ValueError("Could not get timestamp")
string_timestamp = m.group(1)
timestamp = datetime.strptime(string_timestamp, "%Y%m%d%H%M%S")
regex = r"https?://web\.archive.org/web/([0-9]{14})/http"
match = re.search(regex, str(self._archive_url))
if match is None or len(match.groups()) != 1:
raise ValueError(
f"Can not parse timestamp from archive URL, '{self._archive_url}'."
)
string_timestamp = match.group(1)
timestamp = datetime.strptime(string_timestamp, "%Y%m%d%H%M%S")
timestamp_unixtime = time.mktime(timestamp.timetuple()) timestamp_unixtime = time.mktime(timestamp.timetuple())
instance_birth_time_unixtime = time.mktime(self.instance_birth_time.timetuple()) instance_birth_time_unixtime = time.mktime(self.instance_birth_time.timetuple())

View File

@ -1,3 +1,7 @@
"""
Utility functions and shared variables like DEFAULT_USER_AGENT are here.
"""
import requests import requests
from . import __version__ from . import __version__
@ -8,6 +12,7 @@ DEFAULT_USER_AGENT: str = (
def latest_version_pypi(package_name: str, user_agent: str = DEFAULT_USER_AGENT) -> str: def latest_version_pypi(package_name: str, user_agent: str = DEFAULT_USER_AGENT) -> str:
"""Latest waybackpy version on PyPi."""
request_url = "https://pypi.org/pypi/" + package_name + "/json" request_url = "https://pypi.org/pypi/" + package_name + "/json"
headers = {"User-Agent": user_agent} headers = {"User-Agent": user_agent}
response = requests.get(request_url, headers=headers) response = requests.get(request_url, headers=headers)
@ -20,13 +25,14 @@ def latest_version_pypi(package_name: str, user_agent: str = DEFAULT_USER_AGENT)
and data["info"]["version"] is not None and data["info"]["version"] is not None
): ):
return str(data["info"]["version"]) return str(data["info"]["version"])
else:
raise ValueError("Could not get latest pypi version") raise ValueError("Could not get latest pypi version")
def latest_version_github( def latest_version_github(
package_name: str, user_agent: str = DEFAULT_USER_AGENT package_name: str, user_agent: str = DEFAULT_USER_AGENT
) -> str: ) -> str:
"""Latest waybackpy version on GitHub."""
request_url = ( request_url = (
"https://api.github.com/repos/akamhy/" + package_name + "/releases?per_page=1" "https://api.github.com/repos/akamhy/" + package_name + "/releases?per_page=1"
) )
@ -40,5 +46,5 @@ def latest_version_github(
and "tag_name" in data[0] and "tag_name" in data[0]
): ):
return str(data[0]["tag_name"]) return str(data[0]["tag_name"])
else:
raise ValueError("Could not get latest github version") raise ValueError("Could not get latest github version")

View File

@ -1,3 +1,9 @@
"""
This module exists because backwards compatibility matters.
Don't touch this or add any new functionality here and don't use
the Url class.
"""
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Generator, Optional from typing import Generator, Optional
@ -49,12 +55,14 @@ class Url(object):
if not isinstance(self.timestamp, datetime): if not isinstance(self.timestamp, datetime):
raise TypeError("timestamp must be a datetime") raise TypeError("timestamp must be a datetime")
elif self.timestamp == datetime.max:
if self.timestamp == datetime.max:
return td_max.days return td_max.days
else:
return (datetime.utcnow() - self.timestamp).days return (datetime.utcnow() - self.timestamp).days
def save(self) -> "Url": def save(self) -> "Url":
"""Save the URL on wayback machine."""
self.wayback_machine_save_api = WaybackMachineSaveAPI( self.wayback_machine_save_api = WaybackMachineSaveAPI(
self.url, user_agent=self.user_agent self.url, user_agent=self.user_agent
) )
@ -72,7 +80,7 @@ class Url(object):
minute: Optional[int] = None, minute: Optional[int] = None,
unix_timestamp: Optional[int] = None, unix_timestamp: Optional[int] = None,
) -> "Url": ) -> "Url":
"""Returns the archive of the URL close to a date and time."""
self.wayback_machine_availability_api.near( self.wayback_machine_availability_api.near(
year=year, year=year,
month=month, month=month,
@ -85,16 +93,19 @@ class Url(object):
return self return self
def oldest(self) -> "Url": def oldest(self) -> "Url":
"""Returns the oldest archive of the URL."""
self.wayback_machine_availability_api.oldest() self.wayback_machine_availability_api.oldest()
self.set_availability_api_attrs() self.set_availability_api_attrs()
return self return self
def newest(self) -> "Url": def newest(self) -> "Url":
"""Returns the newest archive of the URL."""
self.wayback_machine_availability_api.newest() self.wayback_machine_availability_api.newest()
self.set_availability_api_attrs() self.set_availability_api_attrs()
return self return self
def set_availability_api_attrs(self) -> None: def set_availability_api_attrs(self) -> None:
"""Set the attributes for total backwards compatibility."""
self.archive_url = self.wayback_machine_availability_api.archive_url self.archive_url = self.wayback_machine_availability_api.archive_url
self.JSON = self.wayback_machine_availability_api.JSON self.JSON = self.wayback_machine_availability_api.JSON
self.timestamp = self.wayback_machine_availability_api.timestamp() self.timestamp = self.wayback_machine_availability_api.timestamp()
@ -102,6 +113,10 @@ class Url(object):
def total_archives( def total_archives(
self, start_timestamp: Optional[str] = None, end_timestamp: Optional[str] = None self, start_timestamp: Optional[str] = None, end_timestamp: Optional[str] = None
) -> int: ) -> int:
"""
Returns an integer which indicates total number of archives for an URL.
Useless in my opinion, only here because of backwards compatibility.
"""
cdx = WaybackMachineCDXServerAPI( cdx = WaybackMachineCDXServerAPI(
self.url, self.url,
user_agent=self.user_agent, user_agent=self.user_agent,
@ -122,6 +137,7 @@ class Url(object):
end_timestamp: Optional[str] = None, end_timestamp: Optional[str] = None,
match_type: str = "prefix", match_type: str = "prefix",
) -> Generator[str, None, None]: ) -> Generator[str, None, None]:
"""Yields known URLs for any URL."""
if subdomain: if subdomain:
match_type = "domain" match_type = "domain"
if host: if host:
@ -137,4 +153,4 @@ class Url(object):
) )
for snapshot in cdx.snapshots(): for snapshot in cdx.snapshots():
yield (snapshot.original) yield snapshot.original