diff --git a/tests/test_cli.py b/tests/test_cli.py index 7793759..7704dc1 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- import sys import os import pytest @@ -286,7 +285,7 @@ def test_get(): alive=False, subdomain=False, known_urls=False, - get="BullShit", + get="foobar", ) reply = cli.args_handler(args) assert "get the source code of the" in str(reply) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index 336b3fe..15b68b1 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- import sys import pytest import random @@ -12,14 +11,15 @@ import waybackpy.wrapper as waybackpy # noqa: E402 user_agent = "Mozilla/5.0 (Windows NT 6.2; rv:20.0) Gecko/20121202 Firefox/20.0" -def test_clean_url(): +def test_cleaned_url(): """No API use""" test_url = " https://en.wikipedia.org/wiki/Network security " answer = "https://en.wikipedia.org/wiki/Network_security" target = waybackpy.Url(test_url, user_agent) - test_result = target._clean_url() + test_result = target._cleaned_url() assert answer == test_result + def test_dunders(): """No API use""" url = "https://en.wikipedia.org/wiki/Network_security" @@ -28,19 +28,23 @@ def test_dunders(): assert "waybackpy.Url(url=%s, user_agent=%s)" % (url, user_agent) == repr(target) assert "en.wikipedia.org" in str(target) + def test_url_check(): """No API Use""" broken_url = "http://wwwgooglecom/" with pytest.raises(Exception): waybackpy.Url(broken_url, user_agent) + def test_archive_url_parser(): """No API Use""" perfect_header = """ {'Server': 'nginx/1.15.8', 'Date': 'Sat, 02 Jan 2021 09:40:25 GMT', 'Content-Type': 'text/html; charset=UTF-8', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'X-Archive-Orig-Server': 'nginx', 'X-Archive-Orig-Date': 'Sat, 02 Jan 2021 09:40:09 GMT', 'X-Archive-Orig-Transfer-Encoding': 'chunked', 'X-Archive-Orig-Connection': 'keep-alive', 'X-Archive-Orig-Vary': 'Accept-Encoding', 'X-Archive-Orig-Last-Modified': 'Fri, 01 Jan 2021 12:19:00 GMT', 'X-Archive-Orig-Strict-Transport-Security': 'max-age=31536000, max-age=0;', 'X-Archive-Guessed-Content-Type': 'text/html', 'X-Archive-Guessed-Charset': 'utf-8', 'Memento-Datetime': 'Sat, 02 Jan 2021 09:40:09 GMT', 'Link': '; rel="original", ; rel="timemap"; type="application/link-format", ; rel="timegate", ; rel="first memento"; datetime="Mon, 01 Jun 2020 08:29:11 GMT", ; rel="prev memento"; datetime="Thu, 26 Nov 2020 18:53:27 GMT", ; rel="memento"; datetime="Sat, 02 Jan 2021 09:40:09 GMT", ; rel="last memento"; datetime="Sat, 02 Jan 2021 09:40:09 GMT"', 'Content-Security-Policy': "default-src 'self' 'unsafe-eval' 'unsafe-inline' data: blob: archive.org web.archive.org analytics.archive.org pragma.archivelab.org", 'X-Archive-Src': 'spn2-20210102092956-wwwb-spn20.us.archive.org-8001.warc.gz', 'Server-Timing': 'captures_list;dur=112.646325, exclusion.robots;dur=0.172010, exclusion.robots.policy;dur=0.158205, RedisCDXSource;dur=2.205932, esindex;dur=0.014647, LoadShardBlock;dur=82.205012, PetaboxLoader3.datanode;dur=70.750239, CDXLines.iter;dur=24.306278, load_resource;dur=26.520179', 'X-App-Server': 'wwwb-app200', 'X-ts': '200', 'X-location': 'All', 'X-Cache-Key': 'httpsweb.archive.org/web/20210102094009/https://www.scribbr.com/citing-sources/et-al/IN', 'X-RL': '0', 'X-Page-Cache': 'MISS', 'X-Archive-Screenname': '0', 'Content-Encoding': 'gzip'} """ - archive = waybackpy._archive_url_parser(perfect_header) + archive = waybackpy._archive_url_parser( + perfect_header, "https://www.scribbr.com/citing-sources/et-al/" + ) assert "web.archive.org/web/20210102094009" in archive # The below header should result in Exception @@ -49,7 +53,9 @@ def test_archive_url_parser(): """ with pytest.raises(Exception): - waybackpy._archive_url_parser(no_archive_header) + waybackpy._archive_url_parser( + no_archive_header, "https://www.scribbr.com/citing-sources/et-al/" + ) def test_save(): @@ -173,9 +179,11 @@ def test_get_response(): def test_total_archives(): - - target = waybackpy.Url(" https://google.com ", user_agent) - assert target.total_archives() > 500000 + user_agent = ( + "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0" + ) + target = waybackpy.Url(" https://outlook.com ", user_agent) + assert target.total_archives() > 80000 target = waybackpy.Url( " https://gaha.e4i3n.m5iai3kip6ied.cima/gahh2718gs/ahkst63t7gad8 ", user_agent diff --git a/waybackpy/cli.py b/waybackpy/cli.py index 577d158..fb7c857 100644 --- a/waybackpy/cli.py +++ b/waybackpy/cli.py @@ -1,13 +1,12 @@ -# -*- coding: utf-8 -*- -import sys import os import re -import argparse -import string +import sys import random +import string +import argparse from waybackpy.wrapper import Url -from waybackpy.__version__ import __version__ from waybackpy.exceptions import WaybackError +from waybackpy.__version__ import __version__ def _save(obj): @@ -38,7 +37,7 @@ def _json(obj): return obj.JSON -def handle_not_archived_error(e, obj): +def no_archive_handler(e, obj): m = re.search(r"archive\sfor\s\'(.*?)\'\stry", str(e)) if m: url = m.group(1) @@ -57,14 +56,14 @@ def _oldest(obj): try: return obj.oldest() except Exception as e: - return handle_not_archived_error(e, obj) + return no_archive_handler(e, obj) def _newest(obj): try: return obj.newest() except Exception as e: - return handle_not_archived_error(e, obj) + return no_archive_handler(e, obj) def _total_archives(obj): @@ -83,15 +82,15 @@ def _near(obj, args): try: return obj.near(**_near_args) except Exception as e: - return handle_not_archived_error(e, obj) + return no_archive_handler(e, obj) def _save_urls_on_file(input_list, live_url_count): m = re.search("https?://([A-Za-z_0-9.-]+).*", input_list[0]) + + domain = "domain-unknown" if m: domain = m.group(1) - else: - domain = "domain-unknown" uid = "".join( random.choice(string.ascii_lowercase + string.digits) for _ in range(6) @@ -106,44 +105,39 @@ def _save_urls_on_file(input_list, live_url_count): def _known_urls(obj, args): - """Abbreviations: - sd = subdomain - al = alive """ + Known urls for a domain. + """ + # sd = subdomain sd = False - al = False if args.subdomain: sd = True + + # al = alive + al = False if args.alive: al = True + url_list = obj.known_urls(alive=al, subdomain=sd) total_urls = len(url_list) if total_urls > 0: - text = _save_urls_on_file(url_list, total_urls) - else: - text = "No known URLs found. Please try a diffrent domain!" + return _save_urls_on_file(url_list, total_urls) - return text + return "No known URLs found. Please try a diffrent domain!" def _get(obj, args): if args.get.lower() == "url": return obj.get() - if args.get.lower() == "archive_url": return obj.get(obj.archive_url) - if args.get.lower() == "oldest": return obj.get(obj.oldest()) - if args.get.lower() == "latest" or args.get.lower() == "newest": return obj.get(obj.newest()) - if args.get.lower() == "save": return obj.get(obj.save()) - - return "Use get as \"--get 'source'\", 'source' can be one of the followings: \ \n1) url - get the source code of the url specified using --url/-u.\ \n2) archive_url - get the source code of the newest archive for the supplied url, alias of newest.\ @@ -167,39 +161,48 @@ def args_handler(args): obj = Url(args.url, args.user_agent) if args.save: - return _save(obj) - if args.archive_url: - return _archive_url(obj) - if args.json: - return _json(obj) - if args.oldest: - return _oldest(obj) - if args.newest: - return _newest(obj) - if args.known_urls: - return _known_urls(obj, args) - if args.total: - return _total_archives(obj) - if args.near: + output = _save(obj) + elif args.archive_url: + output = _archive_url(obj) + elif args.json: + output = _json(obj) + elif args.oldest: + output = _oldest(obj) + elif args.newest: + output = _newest(obj) + elif args.known_urls: + output = _known_urls(obj, args) + elif args.total: + output = _total_archives(obj) + elif args.near: return _near(obj, args) - if args.get: - return _get(obj, args) - - return ( + elif args.get: + output = _get(obj, args) + else: + output = ( "You only specified the URL. But you also need to specify the operation." "\nSee 'waybackpy --help' for help using this tool." ) + return output + + +def add_requiredArgs(requiredArgs): + requiredArgs.add_argument( + "--url", "-u", help="URL on which Wayback machine operations would occur" + ) def add_userAgentArg(userAgentArg): help_text = 'User agent, default user_agent is "waybackpy python package - https://github.com/akamhy/waybackpy"' userAgentArg.add_argument("--user_agent", "-ua", help=help_text) + def add_saveArg(saveArg): saveArg.add_argument( "--save", "-s", action="store_true", help="Save the URL on the Wayback machine" ) + def add_auArg(auArg): auArg.add_argument( "--archive_url", @@ -208,6 +211,7 @@ def add_auArg(auArg): help="Get the latest archive URL, alias for --newest", ) + def add_jsonArg(jsonArg): jsonArg.add_argument( "--json", @@ -216,6 +220,7 @@ def add_jsonArg(jsonArg): help="JSON data of the availability API request", ) + def add_oldestArg(oldestArg): oldestArg.add_argument( "--oldest", @@ -224,6 +229,7 @@ def add_oldestArg(oldestArg): help="Oldest archive for the specified URL", ) + def add_newestArg(newestArg): newestArg.add_argument( "--newest", @@ -232,6 +238,7 @@ def add_newestArg(newestArg): help="Newest archive for the specified URL", ) + def add_totalArg(totalArg): totalArg.add_argument( "--total", @@ -240,6 +247,7 @@ def add_totalArg(totalArg): help="Total number of archives for the specified URL", ) + def add_getArg(getArg): getArg.add_argument( "--get", @@ -247,6 +255,7 @@ def add_getArg(getArg): help="Prints the source code of the supplied url. Use '--get help' for extended usage", ) + def add_knownUrlArg(knownUrlArg): knownUrlArg.add_argument( "--known_urls", "-ku", action="store_true", help="URLs known for the domain." @@ -257,6 +266,12 @@ def add_knownUrlArg(knownUrlArg): knownUrlArg.add_argument("--alive", "-a", action="store_true", help=help_text) +def add_nearArg(nearArg): + nearArg.add_argument( + "--near", "-N", action="store_true", help="Archive near specified time" + ) + + def add_nearArgs(nearArgs): nearArgs.add_argument("--year", "-Y", type=int, help="Year in integer") nearArgs.add_argument("--month", "-M", type=int, help="Month in integer") @@ -264,64 +279,35 @@ def add_nearArgs(nearArgs): nearArgs.add_argument("--hour", "-H", type=int, help="Hour in intege") nearArgs.add_argument("--minute", "-MIN", type=int, help="Minute in integer") + def parse_args(argv): parser = argparse.ArgumentParser() - - requiredArgs = parser.add_argument_group("URL argument (required)") - requiredArgs.add_argument( - "--url", "-u", help="URL on which Wayback machine operations would occur" + add_requiredArgs(parser.add_argument_group("URL argument (required)")) + add_userAgentArg(parser.add_argument_group("User Agent")) + add_saveArg(parser.add_argument_group("Create new archive/save URL")) + add_auArg(parser.add_argument_group("Get the latest Archive")) + add_jsonArg(parser.add_argument_group("Get the JSON data")) + add_oldestArg(parser.add_argument_group("Oldest archive")) + add_newestArg(parser.add_argument_group("Newest archive")) + add_totalArg(parser.add_argument_group("Total number of archives")) + add_getArg(parser.add_argument_group("Get source code")) + add_knownUrlArg( + parser.add_argument_group( + "URLs known and archived to Waybcak Machine for the site." + ) ) - - userAgentArg = parser.add_argument_group("User Agent") - add_userAgentArg(userAgentArg) - - saveArg = parser.add_argument_group("Create new archive/save URL") - add_saveArg(saveArg) - - auArg = parser.add_argument_group("Get the latest Archive") - add_auArg(auArg) - - jsonArg = parser.add_argument_group("Get the JSON data") - add_jsonArg(jsonArg) - - oldestArg = parser.add_argument_group("Oldest archive") - add_oldestArg(oldestArg) - - newestArg = parser.add_argument_group("Newest archive") - add_newestArg(newestArg) - - totalArg = parser.add_argument_group("Total number of archives") - add_totalArg(totalArg) - - getArg = parser.add_argument_group("Get source code") - add_getArg(getArg) - - knownUrlArg = parser.add_argument_group( - "URLs known and archived to Waybcak Machine for the site." - ) - add_knownUrlArg(knownUrlArg) - - nearArg = parser.add_argument_group("Archive close to time specified") - nearArg.add_argument( - "--near", "-N", action="store_true", help="Archive near specified time" - ) - #The following is adding supplementary args used with near. - nearArgs = parser.add_argument_group("Arguments that are used only with --near") - add_nearArgs(nearArgs) - + add_nearArg(parser.add_argument_group("Archive close to time specified")) + add_nearArgs(parser.add_argument_group("Arguments that are used only with --near")) parser.add_argument( "--version", "-v", action="store_true", help="Waybackpy version" ) - return parser.parse_args(argv[1:]) def main(argv=None): if argv is None: argv = sys.argv - args = parse_args(argv) - output = args_handler(args) - print(output) + print(args_handler(parse_args(argv))) if __name__ == "__main__": diff --git a/waybackpy/wrapper.py b/waybackpy/wrapper.py index 390a852..3ac4042 100644 --- a/waybackpy/wrapper.py +++ b/waybackpy/wrapper.py @@ -1,59 +1,62 @@ -# -*- coding: utf-8 -*- - import re -from datetime import datetime, timedelta -from waybackpy.exceptions import WaybackError, URLError -from waybackpy.__version__ import __version__ import requests import concurrent.futures +from datetime import datetime, timedelta +from waybackpy.__version__ import __version__ +from waybackpy.exceptions import WaybackError, URLError -default_UA = "waybackpy python package - https://github.com/akamhy/waybackpy" +default_user_agent = "waybackpy python package - https://github.com/akamhy/waybackpy" -def _archive_url_parser(header): +def _archive_url_parser(header, url): """ + The wayback machine's save API doesn't + return JSON response, we are required + to read the header of the API response + and look for the archive URL. + This method has some regexen (or regexes) that search for archive url in header. This method is used when you try to save a webpage on wayback machine. - The wayback machine's save API doesn't - return JSON response, we are required - to read the header of the API response - and look for the archive URL. - Two cases are possible: 1) Either we find the archive url in the header. - 2) We didn't find the archive url in + 2) Or we didn't find the archive url in API header. - If we found the archive we return it. + If we found the archive URL we return it. - And if we couldn't find it we raise - WaybackError with a standard Error message. + And if we couldn't find it, we raise + WaybackError with an error message. """ + # Regex1 - arch = re.search(r"Content-Location: (/web/[0-9]{14}/.*)", str(header)) - if arch: - return "web.archive.org" + arch.group(1) + m = re.search(r"Content-Location: (/web/[0-9]{14}/.*)", str(header)) + if m: + return "web.archive.org" + m.group(1) + # Regex2 - arch = re.search( + m = re.search( r"rel=\"memento.*?(web\.archive\.org/web/[0-9]{14}/.*?)>", str(header) ) - if arch: - return arch.group(1) + if m: + return m.group(1) + # Regex3 - arch = re.search(r"X-Cache-Key:\shttps(.*)[A-Z]{2}", str(header)) - if arch: - return arch.group(1) + m = re.search(r"X-Cache-Key:\shttps(.*)[A-Z]{2}", str(header)) + if m: + return m.group(1) + raise WaybackError( "No archive URL found in the API response. " - "This version of waybackpy (%s) is likely out of date or WayBack Machine is malfunctioning. Visit " - "https://github.com/akamhy/waybackpy for the latest version " + "If '%s' can be accessed via your web browser then either " + "this version of waybackpy (%s) is out of date or WayBack Machine is malfunctioning. Visit " + "'https://github.com/akamhy/waybackpy' for the latest version " "of waybackpy.\nHeader:\n%s" % (__version__, str(header)) ) @@ -79,6 +82,7 @@ def _wayback_timestamp(**kwargs): Return format is string. """ + return "".join( str(kwargs[key]).zfill(2) for key in ["year", "month", "day", "hour", "minute"] ) @@ -104,26 +108,25 @@ def _get_response(endpoint, params=None, headers=None): """ try: - response = requests.get(endpoint, params=params, headers=headers) + return requests.get(endpoint, params=params, headers=headers) except Exception: try: - response = requests.get(endpoint, params=params, headers=headers) # nosec + return requests.get(endpoint, params=params, headers=headers) except Exception as e: exc = WaybackError("Error while retrieving %s" % endpoint) exc.__cause__ = e raise exc - return response class Url: """ - waybackpy Url object + waybackpy Url object, Type : """ - def __init__(self, url, user_agent=default_UA): + def __init__(self, url, user_agent=default_user_agent): self.url = url self.user_agent = user_agent - self._url_check() # checks url validity on init. + self._url_check() self._archive_url = None self.timestamp = None self._JSON = None @@ -144,6 +147,7 @@ class Url: sets self._archive_url, we now set self._archive_url to self.archive_url and return it. """ + if not self._archive_url: self._archive_url = self.archive_url return "%s" % self._archive_url @@ -159,8 +163,7 @@ class Url: if self.timestamp == datetime.max: return td_max.days - diff = datetime.utcnow() - self.timestamp - return diff.days + return (datetime.utcnow() - self.timestamp).days def _url_check(self): """ @@ -170,6 +173,7 @@ class Url: If you known any others, please create a PR on the github repo. """ + if "." not in self.url: raise URLError("'%s' is not a vaild URL." % self.url) @@ -184,7 +188,7 @@ class Url: endpoint = "https://archive.org/wayback/available" headers = {"User-Agent": "%s" % self.user_agent} - payload = {"url": "%s" % self._clean_url()} + payload = {"url": "%s" % self._cleaned_url()} response = _get_response(endpoint, params=payload, headers=headers) return response.json() @@ -236,7 +240,7 @@ class Url: self.timestamp = ts return ts - def _clean_url(self): + def _cleaned_url(self): """ Remove newlines replace " " with "_" @@ -245,10 +249,10 @@ class Url: def save(self): """Create a new Wayback Machine archive for this URL.""" - request_url = "https://web.archive.org/save/" + self._clean_url() + request_url = "https://web.archive.org/save/" + self._cleaned_url() headers = {"User-Agent": "%s" % self.user_agent} response = _get_response(request_url, params=None, headers=headers) - self._archive_url = "https://" + _archive_url_parser(response.headers) + self._archive_url = "https://" + _archive_url_parser(response.headers, self.url) self.timestamp = datetime.utcnow() return self @@ -258,7 +262,7 @@ class Url: """ if not url: - url = self._clean_url() + url = self._cleaned_url() if not user_agent: user_agent = self.user_agent @@ -307,14 +311,14 @@ class Url: endpoint = "https://archive.org/wayback/available" headers = {"User-Agent": "%s" % self.user_agent} - payload = {"url": "%s" % self._clean_url(), "timestamp": timestamp} + payload = {"url": "%s" % self._cleaned_url(), "timestamp": timestamp} response = _get_response(endpoint, params=payload, headers=headers) data = response.json() if not data["archived_snapshots"]: raise WaybackError( "Can not find archive for '%s' try later or use wayback.Url(url, user_agent).save() " - "to create a new archive." % self._clean_url() + "to create a new archive." % self._cleaned_url() ) archive_url = data["archived_snapshots"]["closest"]["url"] archive_url = archive_url.replace( @@ -362,18 +366,24 @@ class Url: Return type in integer. """ + total_pages_url = ( + "https://web.archive.org/cdx/search/cdx?url=%s&showNumPages=true" + % self._cleaned_url() + ) + headers = {"User-Agent": "%s" % self.user_agent} + total_pages = int( + (_get_response(total_pages_url, headers=headers).text).strip() + ) - endpoint = "https://web.archive.org/cdx/search/cdx" - headers = { - "User-Agent": "%s" % self.user_agent, - "output": "json", - "fl": "statuscode", - } - payload = {"url": "%s" % self._clean_url()} - response = _get_response(endpoint, params=payload, headers=headers) - - # Most efficient method to count number of archives (yet) - return response.text.count(",") + archive_count = 0 + for i in range(total_pages): + page_url = "https://web.archive.org/cdx/search/cdx?url=%s&page=%s" % ( + self._cleaned_url(), + str(i), + ) + count = str(_get_response(page_url, headers=headers).text).count("\n") + archive_count = archive_count + count + return archive_count def live_urls_picker(self, url): """ @@ -384,7 +394,7 @@ class Url: try: response_code = requests.get(url).status_code except Exception: - return # we don't care if Exception + return # we don't care if Exception # 200s are OK and 300s are usually redirects, if you don't want redirects replace 400 with 300 if response_code >= 400: @@ -406,12 +416,12 @@ class Url: if subdomain: request_url = ( "https://web.archive.org/cdx/search/cdx?url=*.%s/*&output=json&fl=original&collapse=urlkey" - % self._clean_url() + % self._cleaned_url() ) else: request_url = ( "http://web.archive.org/cdx/search/cdx?url=%s/*&output=json&fl=original&collapse=urlkey" - % self._clean_url() + % self._cleaned_url() ) headers = {"User-Agent": "%s" % self.user_agent}