now using cdx Pagination API

This commit is contained in:
Akash Mahanty 2021-01-04 20:46:54 +05:30
parent 0c6107e675
commit 1882862992

View File

@ -9,6 +9,20 @@ from waybackpy.exceptions import WaybackError, URLError
default_user_agent = "waybackpy python package - https://github.com/akamhy/waybackpy"
def _get_total_pages(url, user_agent):
"""
If showNumPages is passed in cdx API, it returns 'number of pages of'
and each page has many archives.
This func returns number of pages (type int).
"""
total_pages_url = (
"https://web.archive.org/cdx/search/cdx?url=%s&showNumPages=true" % url
)
headers = {"User-Agent": user_agent}
return int((_get_response(total_pages_url, headers=headers).text).strip())
def _archive_url_parser(header, url):
"""
The wayback machine's save API doesn't
@ -120,12 +134,12 @@ def _get_response(endpoint, params=None, headers=None):
class Url:
"""
waybackpy Url object, Type : <class 'waybackpy.wrapper.Url'>
waybackpy Url class, Type : <class 'waybackpy.wrapper.Url'>
"""
def __init__(self, url, user_agent=default_user_agent):
self.url = url
self.user_agent = user_agent
self.user_agent = str(user_agent)
self._url_check()
self._archive_url = None
self.timestamp = None
@ -187,7 +201,7 @@ class Url:
return self._JSON
endpoint = "https://archive.org/wayback/available"
headers = {"User-Agent": "%s" % self.user_agent}
headers = {"User-Agent": self.user_agent}
payload = {"url": "%s" % self._cleaned_url()}
response = _get_response(endpoint, params=payload, headers=headers)
return response.json()
@ -250,7 +264,7 @@ class Url:
def save(self):
"""Create a new Wayback Machine archive for this URL."""
request_url = "https://web.archive.org/save/" + self._cleaned_url()
headers = {"User-Agent": "%s" % self.user_agent}
headers = {"User-Agent": self.user_agent}
response = _get_response(request_url, params=None, headers=headers)
self._archive_url = "https://" + _archive_url_parser(response.headers, self.url)
self.timestamp = datetime.utcnow()
@ -267,7 +281,7 @@ class Url:
if not user_agent:
user_agent = self.user_agent
headers = {"User-Agent": "%s" % self.user_agent}
headers = {"User-Agent": self.user_agent}
response = _get_response(url, params=None, headers=headers)
if not encoding:
@ -310,7 +324,7 @@ class Url:
)
endpoint = "https://archive.org/wayback/available"
headers = {"User-Agent": "%s" % self.user_agent}
headers = {"User-Agent": self.user_agent}
payload = {"url": "%s" % self._cleaned_url(), "timestamp": timestamp}
response = _get_response(endpoint, params=payload, headers=headers)
data = response.json()
@ -356,7 +370,7 @@ class Url:
"""
return self.near()
def total_archives(self):
def total_archives(self, start_timestamp=None, end_timestamp=None):
"""
A webpage can have multiple archives on the wayback machine
If someone wants to count the total number of archives of a
@ -366,24 +380,17 @@ class Url:
Return type in integer.
"""
total_pages_url = (
"https://web.archive.org/cdx/search/cdx?url=%s&showNumPages=true"
% self._cleaned_url()
)
headers = {"User-Agent": "%s" % self.user_agent}
total_pages = int(
(_get_response(total_pages_url, headers=headers).text).strip()
)
archive_count = 0
for i in range(total_pages):
page_url = "https://web.archive.org/cdx/search/cdx?url=%s&page=%s" % (
self._cleaned_url(),
str(i),
)
count = str(_get_response(page_url, headers=headers).text).count("\n")
archive_count = archive_count + count
return archive_count
cdx = Cdx(
self._cleaned_url(),
user_agent=self.user_agent,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
)
i = 0
for _ in cdx.snapshots():
i += 1
return i
def live_urls_picker(self, url):
"""
@ -402,7 +409,9 @@ class Url:
self._alive_url_list.append(url)
def known_urls(self, alive=False, subdomain=False):
def known_urls(
self, alive=False, subdomain=False, start_timestamp=None, end_timestamp=None
):
"""
Returns list of URLs known to exist for given domain name
because these URLs were crawled by WayBack Machine bots.
@ -414,20 +423,23 @@ class Url:
url_list = []
if subdomain:
request_url = (
"https://web.archive.org/cdx/search/cdx?url=*.%s/*&output=json&fl=original&collapse=urlkey"
% self._cleaned_url()
)
url = "*.%s/*" % self._cleaned_url()
else:
request_url = (
"http://web.archive.org/cdx/search/cdx?url=%s/*&output=json&fl=original&collapse=urlkey"
% self._cleaned_url()
)
url = "%s/*" % self._cleaned_url()
headers = {"User-Agent": "%s" % self.user_agent}
response = _get_response(request_url, params=None, headers=headers)
data = response.json()
url_list = [y[0] for y in data if y[0] != "original"]
cdx = Cdx(
url,
user_agent=self.user_agent,
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
)
snapshots = cdx.snapshots()
url_list = []
for snapshot in snapshots:
url_list.append(snapshot.original)
url_list = list(set(url_list)) # remove duplicates
# Remove all deadURLs from url_list if alive=True
if alive:
@ -436,3 +448,88 @@ class Url:
url_list = self._alive_url_list
return url_list
class CdxSnapshot:
"""
[["urlkey","timestamp","original","mimetype","statuscode","digest","length"],
["org,archive)/", "19970126045828", "http://www.archive.org:80/", "text/html", "200", "Q4YULN754FHV2U6Q5JUT6Q2P57WEWNNY", "1415"]]
"""
def __init__(
self, urlkey, timestamp, original, mimetype, statuscode, digest, length
):
self.urlkey = urlkey # Useless
self.timestamp = timestamp
self.original = original
self.mimetype = mimetype
self.statuscode = statuscode
self.digest = digest
self.length = length
self.archive_url = "https://web.archive.org/web/%s/%s" % (
self.timestamp,
self.original,
)
def __str__(self):
return self.archive_url
class Cdx:
"""
waybackpy Cdx class, Type : <class 'waybackpy.wrapper.Cdx'>
Cdx keys are :
urlkey
timestamp
original
mimetype
statuscode
digest
length
"""
def __init__(
self,
url,
user_agent=default_user_agent,
start_timestamp=None,
end_timestamp=None,
):
self.url = url
self.user_agent = str(user_agent)
self.start_timestamp = str(start_timestamp) if start_timestamp else None
self.end_timestamp = str(end_timestamp) if end_timestamp else None
def snapshots(self):
payload = {}
endpoint = "https://web.archive.org/cdx/search/cdx"
total_pages = _get_total_pages(self.url, self.user_agent)
headers = {"User-Agent": self.user_agent}
if self.start_timestamp:
payload["from"] = self.start_timestamp
if self.end_timestamp:
payload["to"] = self.end_timestamp
payload["url"] = self.url
for i in range(total_pages):
payload["page"] = str(i)
res = _get_response(endpoint, params=payload, headers=headers)
text = res.text
if text.isspace() or len(text) <= 1 or not text:
break
snapshot_list = text.split("\n")
for snapshot in snapshot_list:
if len(snapshot) < 15:
continue
(
urlkey,
timestamp,
original,
mimetype,
statuscode,
digest,
length,
) = snapshot.split(" ")
yield CdxSnapshot(
urlkey, timestamp, original, mimetype, statuscode, digest, length
)