diff --git a/tests/test_cdx.py b/tests/test_cdx.py new file mode 100644 index 0000000..887afd7 --- /dev/null +++ b/tests/test_cdx.py @@ -0,0 +1,93 @@ +import pytest +from waybackpy.cdx import Cdx +from waybackpy.exceptions import WaybackError + + +def test_all_cdx(): + url = "akamhy.github.io" + user_agent = "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, \ + like Gecko) Chrome/45.0.2454.85 Safari/537.36" + cdx = Cdx( + url=url, + user_agent=user_agent, + start_timestamp=2017, + end_timestamp=2020, + filters=[ + "statuscode:200", + "mimetype:text/html", + "timestamp:20201002182319", + "original:https://akamhy.github.io/", + ], + gzip=False, + collapses=["timestamp:10", "digest"], + limit=50, + match_type="prefix", + ) + snapshots = cdx.snapshots() + for snapshot in snapshots: + ans = snapshot.archive_url + assert "https://web.archive.org/web/20201002182319/https://akamhy.github.io/" == ans + + url = "akahfjgjkmhy.gihthub.ip" + cdx = Cdx( + url=url, + user_agent=user_agent, + start_timestamp=None, + end_timestamp=None, + filters=[], + match_type=None, + gzip=True, + collapses=[], + limit=10, + ) + + snapshots = cdx.snapshots() + print(snapshots) + i = 0 + for _ in snapshots: + i += 1 + assert i == 0 + + url = "https://github.com/akamhy/waybackpy/*" + cdx = Cdx(url=url, user_agent=user_agent, limit=50) + snapshots = cdx.snapshots() + + for snapshot in snapshots: + print(snapshot.archive_url) + + url = "https://github.com/akamhy/waybackpy" + with pytest.raises(WaybackError): + cdx = Cdx(url=url, user_agent=user_agent, limit=50, filters=["ghddhfhj"]) + snapshots = cdx.snapshots() + + with pytest.raises(WaybackError): + cdx = Cdx(url=url, user_agent=user_agent, collapses=["timestamp", "ghdd:hfhj"]) + snapshots = cdx.snapshots() + + url = "https://github.com" + cdx = Cdx(url=url, user_agent=user_agent, limit=50) + snapshots = cdx.snapshots() + c = 0 + for snapshot in snapshots: + c += 1 + if c > 100: + break + + url = "https://github.com/*" + cdx = Cdx(url=url, user_agent=user_agent, collapses=["timestamp"]) + snapshots = cdx.snapshots() + c = 0 + for snapshot in snapshots: + c += 1 + if c > 30_529: # deafult limit is 10k + break + + url = "https://github.com/*" + cdx = Cdx(url=url, user_agent=user_agent) + c = 0 + snapshots = cdx.snapshots() + + for snapshot in snapshots: + c += 1 + if c > 100_529: + break diff --git a/tests/test_snapshot.py b/tests/test_snapshot.py new file mode 100644 index 0000000..8d02a49 --- /dev/null +++ b/tests/test_snapshot.py @@ -0,0 +1,32 @@ +import pytest + +from waybackpy.snapshot import CdxSnapshot, datetime + + +def test_CdxSnapshot(): + sample_input = "org,archive)/ 20080126045828 http://github.com text/html 200 Q4YULN754FHV2U6Q5JUT6Q2P57WEWNNY 1415" + ( + urlkey, + timestamp, + original, + mimetype, + statuscode, + digest, + length, + ) = sample_input.split(" ") + + snapshot = CdxSnapshot( + urlkey, timestamp, original, mimetype, statuscode, digest, length + ) + + assert urlkey == snapshot.urlkey + assert timestamp == snapshot.timestamp + assert original == snapshot.original + assert mimetype == snapshot.mimetype + assert statuscode == snapshot.statuscode + assert digest == snapshot.digest + assert length == snapshot.length + assert datetime.strptime(timestamp, "%Y%m%d%H%M%S") == snapshot.datetime_timestamp + archive_url = "https://web.archive.org/web/" + timestamp + "/" + original + assert archive_url == snapshot.archive_url + assert archive_url == str(snapshot) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..e391f89 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,190 @@ +import pytest +import json + +from waybackpy.utils import ( + _cleaned_url, + _url_check, + _full_url, + URLError, + WaybackError, + _get_total_pages, + _archive_url_parser, + _wayback_timestamp, + _get_response, + _check_match_type, + _check_collapses, + _check_filters, + _ts, +) + + +def test_ts(): + timestamp = True + data = {} + assert _ts(timestamp, data) + + data = """ + {"archived_snapshots": {"closest": {"timestamp": "20210109155628", "available": true, "status": "200", "url": "http://web.archive.org/web/20210109155628/https://www.google.com/"}}, "url": "https://www.google.com/"} + """ + data = json.loads(data) + assert data["archived_snapshots"]["closest"]["timestamp"] == "20210109155628" + + +def test_check_filters(): + filters = [] + _check_filters(filters) + + filters = ["statuscode:200", "timestamp:20215678901234", "original:https://url.com"] + _check_filters(filters) + + filters = ["statuscode:2768900", "timestamp:123456789", "original:://url.com"] + with pytest.raises(WaybackError): + _check_filters(filters) + + with pytest.raises(WaybackError): + _check_filters("not-list") + + +def test_check_collapses(): + collapses = [] + _check_collapses(collapses) + + collapses = ["timestamp:10"] + _check_collapses(collapses) + + collapses = ["urlkey"] + _check_collapses(collapses) + + collapses = "urlkey" # NOT LIST + with pytest.raises(WaybackError): + _check_collapses(collapses) + + collapses = ["also illegal collapse"] + with pytest.raises(WaybackError): + _check_collapses(collapses) + + +def test_check_match_type(): + assert None == _check_match_type(None, "url") + match_type = "exact" + url = "test_url" + assert None == _check_match_type(match_type, url) + + url = "has * in it" + with pytest.raises(WaybackError): + _check_match_type("domain", url) + + with pytest.raises(WaybackError): + _check_match_type("not a valid type", "url") + + +def test_cleaned_url(): + test_url = " https://en.wikipedia.org/wiki/Network security " + answer = "https://en.wikipedia.org/wiki/Network%20security" + assert answer == _cleaned_url(test_url) + + +def test_url_check(): + good_url = "https://akamhy.github.io" + assert None == _url_check(good_url) + + bad_url = "https://github-com" + with pytest.raises(URLError): + _url_check(bad_url) + + +def test_full_url(): + params = {} + endpoint = "https://web.archive.org/cdx/search/cdx" + assert endpoint == _full_url(endpoint, params) + + params = {"a": "1"} + assert "https://web.archive.org/cdx/search/cdx?a=1" == _full_url(endpoint, params) + assert "https://web.archive.org/cdx/search/cdx?a=1" == _full_url( + endpoint + "?", params + ) + + params["b"] = 2 + assert "https://web.archive.org/cdx/search/cdx?a=1&b=2" == _full_url( + endpoint + "?", params + ) + + params["c"] = "foo bar" + assert "https://web.archive.org/cdx/search/cdx?a=1&b=2&c=foo%20bar" == _full_url( + endpoint + "?", params + ) + + +def test_get_total_pages(): + user_agent = "Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko" + url = "github.com*" + assert 212890 <= _get_total_pages(url, user_agent) + + url = "https://zenodo.org/record/4416138" + assert 2 >= _get_total_pages(url, user_agent) + + +def test_archive_url_parser(): + perfect_header = """ + {'Server': 'nginx/1.15.8', 'Date': 'Sat, 02 Jan 2021 09:40:25 GMT', 'Content-Type': 'text/html; charset=UTF-8', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'X-Archive-Orig-Server': 'nginx', 'X-Archive-Orig-Date': 'Sat, 02 Jan 2021 09:40:09 GMT', 'X-Archive-Orig-Transfer-Encoding': 'chunked', 'X-Archive-Orig-Connection': 'keep-alive', 'X-Archive-Orig-Vary': 'Accept-Encoding', 'X-Archive-Orig-Last-Modified': 'Fri, 01 Jan 2021 12:19:00 GMT', 'X-Archive-Orig-Strict-Transport-Security': 'max-age=31536000, max-age=0;', 'X-Archive-Guessed-Content-Type': 'text/html', 'X-Archive-Guessed-Charset': 'utf-8', 'Memento-Datetime': 'Sat, 02 Jan 2021 09:40:09 GMT', 'Link': '; rel="original", ; rel="timemap"; type="application/link-format", ; rel="timegate", ; rel="first memento"; datetime="Mon, 01 Jun 2020 08:29:11 GMT", ; rel="prev memento"; datetime="Thu, 26 Nov 2020 18:53:27 GMT", ; rel="memento"; datetime="Sat, 02 Jan 2021 09:40:09 GMT", ; rel="last memento"; datetime="Sat, 02 Jan 2021 09:40:09 GMT"', 'Content-Security-Policy': "default-src 'self' 'unsafe-eval' 'unsafe-inline' data: blob: archive.org web.archive.org analytics.archive.org pragma.archivelab.org", 'X-Archive-Src': 'spn2-20210102092956-wwwb-spn20.us.archive.org-8001.warc.gz', 'Server-Timing': 'captures_list;dur=112.646325, exclusion.robots;dur=0.172010, exclusion.robots.policy;dur=0.158205, RedisCDXSource;dur=2.205932, esindex;dur=0.014647, LoadShardBlock;dur=82.205012, PetaboxLoader3.datanode;dur=70.750239, CDXLines.iter;dur=24.306278, load_resource;dur=26.520179', 'X-App-Server': 'wwwb-app200', 'X-ts': '200', 'X-location': 'All', 'X-Cache-Key': 'httpsweb.archive.org/web/20210102094009/https://www.scribbr.com/citing-sources/et-al/IN', 'X-RL': '0', 'X-Page-Cache': 'MISS', 'X-Archive-Screenname': '0', 'Content-Encoding': 'gzip'} + """ + + archive = _archive_url_parser( + perfect_header, "https://www.scribbr.com/citing-sources/et-al/" + ) + assert "web.archive.org/web/20210102094009" in archive + + header = """ + vhgvkjv + Content-Location: /web/20201126185327/https://www.scribbr.com/citing-sources/et-al + ghvjkbjmmcmhj + """ + archive = _archive_url_parser( + header, "https://www.scribbr.com/citing-sources/et-al/" + ) + assert "20201126185327" in archive + + header = """ + hfjkfjfcjhmghmvjm + X-Cache-Key: https://web.archive.org/web/20171128185327/https://www.scribbr.com/citing-sources/et-al/US + yfu,u,gikgkikik + """ + archive = _archive_url_parser( + header, "https://www.scribbr.com/citing-sources/et-al/" + ) + assert "20171128185327" in archive + + # The below header should result in Exception + no_archive_header = """ + {'Server': 'nginx/1.15.8', 'Date': 'Sat, 02 Jan 2021 09:42:45 GMT', 'Content-Type': 'text/html; charset=utf-8', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'Cache-Control': 'no-cache', 'X-App-Server': 'wwwb-app52', 'X-ts': '523', 'X-RL': '0', 'X-Page-Cache': 'MISS', 'X-Archive-Screenname': '0'} + """ + + with pytest.raises(WaybackError): + _archive_url_parser( + no_archive_header, "https://www.scribbr.com/citing-sources/et-al/" + ) + + +def test_wayback_timestamp(): + ts = _wayback_timestamp(year=2020, month=1, day=2, hour=3, minute=4) + assert "202001020304" in str(ts) + + +def test_get_response(): + endpoint = "https://www.google.com" + user_agent = ( + "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0" + ) + headers = {"User-Agent": "%s" % user_agent} + response = _get_response(endpoint, params=None, headers=headers) + assert response.status_code == 200 + + endpoint = "http/wwhfhfvhvjhmom" + with pytest.raises(WaybackError): + _get_response(endpoint, params=None, headers=headers) + + endpoint = "https://akamhy.github.io" + url, response = _get_response( + endpoint, params=None, headers=headers, return_full_url=True + ) + assert endpoint == url diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index c414568..1bbf9f7 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -4,85 +4,17 @@ import random import requests from datetime import datetime -sys.path.append("..") - -import waybackpy.wrapper as waybackpy # noqa: E402 +from waybackpy.wrapper import Url, Cdx user_agent = "Mozilla/5.0 (Windows NT 6.2; rv:20.0) Gecko/20121202 Firefox/20.0" -def test_cleaned_url(): - """No API use""" - test_url = " https://en.wikipedia.org/wiki/Network security " - answer = "https://en.wikipedia.org/wiki/Network_security" - target = waybackpy.Url(test_url, user_agent) - test_result = target._cleaned_url() - assert answer == test_result - - -def test_ts(): - a = waybackpy.Url("https://google.com", user_agent) - ts = a._timestamp - assert str(datetime.utcnow().year) in str(ts) - - -def test_dunders(): - """No API use""" - url = "https://en.wikipedia.org/wiki/Network_security" - user_agent = "UA" - target = waybackpy.Url(url, user_agent) - assert "waybackpy.Url(url=%s, user_agent=%s)" % (url, user_agent) == repr(target) - assert "en.wikipedia.org" in str(target) - - def test_url_check(): """No API Use""" broken_url = "http://wwwgooglecom/" with pytest.raises(Exception): - waybackpy.Url(broken_url, user_agent) - - -def test_archive_url_parser(): - """No API Use""" - perfect_header = """ - {'Server': 'nginx/1.15.8', 'Date': 'Sat, 02 Jan 2021 09:40:25 GMT', 'Content-Type': 'text/html; charset=UTF-8', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'X-Archive-Orig-Server': 'nginx', 'X-Archive-Orig-Date': 'Sat, 02 Jan 2021 09:40:09 GMT', 'X-Archive-Orig-Transfer-Encoding': 'chunked', 'X-Archive-Orig-Connection': 'keep-alive', 'X-Archive-Orig-Vary': 'Accept-Encoding', 'X-Archive-Orig-Last-Modified': 'Fri, 01 Jan 2021 12:19:00 GMT', 'X-Archive-Orig-Strict-Transport-Security': 'max-age=31536000, max-age=0;', 'X-Archive-Guessed-Content-Type': 'text/html', 'X-Archive-Guessed-Charset': 'utf-8', 'Memento-Datetime': 'Sat, 02 Jan 2021 09:40:09 GMT', 'Link': '; rel="original", ; rel="timemap"; type="application/link-format", ; rel="timegate", ; rel="first memento"; datetime="Mon, 01 Jun 2020 08:29:11 GMT", ; rel="prev memento"; datetime="Thu, 26 Nov 2020 18:53:27 GMT", ; rel="memento"; datetime="Sat, 02 Jan 2021 09:40:09 GMT", ; rel="last memento"; datetime="Sat, 02 Jan 2021 09:40:09 GMT"', 'Content-Security-Policy': "default-src 'self' 'unsafe-eval' 'unsafe-inline' data: blob: archive.org web.archive.org analytics.archive.org pragma.archivelab.org", 'X-Archive-Src': 'spn2-20210102092956-wwwb-spn20.us.archive.org-8001.warc.gz', 'Server-Timing': 'captures_list;dur=112.646325, exclusion.robots;dur=0.172010, exclusion.robots.policy;dur=0.158205, RedisCDXSource;dur=2.205932, esindex;dur=0.014647, LoadShardBlock;dur=82.205012, PetaboxLoader3.datanode;dur=70.750239, CDXLines.iter;dur=24.306278, load_resource;dur=26.520179', 'X-App-Server': 'wwwb-app200', 'X-ts': '200', 'X-location': 'All', 'X-Cache-Key': 'httpsweb.archive.org/web/20210102094009/https://www.scribbr.com/citing-sources/et-al/IN', 'X-RL': '0', 'X-Page-Cache': 'MISS', 'X-Archive-Screenname': '0', 'Content-Encoding': 'gzip'} - """ - - archive = waybackpy._archive_url_parser( - perfect_header, "https://www.scribbr.com/citing-sources/et-al/" - ) - assert "web.archive.org/web/20210102094009" in archive - - header = """ - vhgvkjv - Content-Location: /web/20201126185327/https://www.scribbr.com/citing-sources/et-al - ghvjkbjmmcmhj - """ - archive = waybackpy._archive_url_parser( - header, "https://www.scribbr.com/citing-sources/et-al/" - ) - assert "20201126185327" in archive - - header = """ - hfjkfjfcjhmghmvjm - X-Cache-Key: https://web.archive.org/web/20171128185327/https://www.scribbr.com/citing-sources/et-al/US - yfu,u,gikgkikik - """ - archive = waybackpy._archive_url_parser( - header, "https://www.scribbr.com/citing-sources/et-al/" - ) - assert "20171128185327" in archive - - # The below header should result in Exception - no_archive_header = """ - {'Server': 'nginx/1.15.8', 'Date': 'Sat, 02 Jan 2021 09:42:45 GMT', 'Content-Type': 'text/html; charset=utf-8', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'Cache-Control': 'no-cache', 'X-App-Server': 'wwwb-app52', 'X-ts': '523', 'X-RL': '0', 'X-Page-Cache': 'MISS', 'X-Archive-Screenname': '0'} - """ - - with pytest.raises(Exception): - waybackpy._archive_url_parser( - no_archive_header, "https://www.scribbr.com/citing-sources/et-al/" - ) + Url(broken_url, user_agent) def test_save(): @@ -90,15 +22,14 @@ def test_save(): url_list = [ "en.wikipedia.org", - "www.wikidata.org", - "commons.wikimedia.org", + "akamhy.github.io", "www.wiktionary.org", "www.w3schools.com", - "www.ibm.com", + "youtube.com", ] x = random.randint(0, len(url_list) - 1) url1 = url_list[x] - target = waybackpy.Url( + target = Url( url1, "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/36.0.1944.0 Safari/537.36", @@ -109,11 +40,11 @@ def test_save(): # Test for urls that are incorrect. with pytest.raises(Exception): url2 = "ha ha ha ha" - waybackpy.Url(url2, user_agent) + Url(url2, user_agent) url3 = "http://www.archive.is/faq.html" with pytest.raises(Exception): - target = waybackpy.Url( + target = Url( url3, "Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US) " "AppleWebKit/533.20.25 (KHTML, like Gecko) Version/5.0.4 " @@ -124,7 +55,7 @@ def test_save(): def test_near(): url = "google.com" - target = waybackpy.Url( + target = Url( url, "Mozilla/5.0 (Windows; U; Windows NT 6.0; de-DE) AppleWebKit/533.20.25 " "(KHTML, like Gecko) Version/5.0.3 Safari/533.19.4", @@ -139,7 +70,7 @@ def test_near(): or ("2015-03" in archive_near_month_year) ) - target = waybackpy.Url( + target = Url( "www.python.org", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36 Edge/12.246", @@ -157,13 +88,13 @@ def test_near(): NeverArchivedUrl = ( "https://ee_3n.wrihkeipef4edia.org/rwti5r_ki/Nertr6w_rork_rse7c_urity" ) - target = waybackpy.Url(NeverArchivedUrl, user_agent) + target = Url(NeverArchivedUrl, user_agent) target.near(year=2010) def test_oldest(): url = "github.com/akamhy/waybackpy" - target = waybackpy.Url(url, user_agent) + target = Url(url, user_agent) o = target.oldest() assert "20200504141153" in str(o) assert "2020-05-04" in str(o._timestamp) @@ -171,50 +102,35 @@ def test_oldest(): def test_json(): url = "github.com/akamhy/waybackpy" - target = waybackpy.Url(url, user_agent) + target = Url(url, user_agent) assert "archived_snapshots" in str(target.JSON) def test_archive_url(): url = "github.com/akamhy/waybackpy" - target = waybackpy.Url(url, user_agent) + target = Url(url, user_agent) assert "github.com/akamhy" in str(target.archive_url) def test_newest(): url = "github.com/akamhy/waybackpy" - target = waybackpy.Url(url, user_agent) + target = Url(url, user_agent) assert url in str(target.newest()) def test_get(): - target = waybackpy.Url("google.com", user_agent) + target = Url("google.com", user_agent) assert "Welcome to Google" in target.get(target.oldest()) -def test_wayback_timestamp(): - ts = waybackpy._wayback_timestamp(year=2020, month=1, day=2, hour=3, minute=4) - assert "202001020304" in str(ts) - - -def test_get_response(): - endpoint = "https://www.google.com" - user_agent = ( - "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0" - ) - headers = {"User-Agent": "%s" % user_agent} - response = waybackpy._get_response(endpoint, params=None, headers=headers) - assert response.status_code == 200 - - def test_total_archives(): user_agent = ( "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0" ) - target = waybackpy.Url(" https://outlook.com ", user_agent) + target = Url(" https://outlook.com ", user_agent) assert target.total_archives() > 80000 - target = waybackpy.Url( + target = Url( " https://gaha.e4i3n.m5iai3kip6ied.cima/gahh2718gs/ahkst63t7gad8 ", user_agent ) assert target.total_archives() == 0 @@ -222,8 +138,8 @@ def test_total_archives(): def test_known_urls(): - target = waybackpy.Url("akamhy.github.io", user_agent) + target = Url("akamhy.github.io", user_agent) assert len(target.known_urls(alive=True, subdomain=False)) > 2 - target = waybackpy.Url("akamhy.github.io", user_agent) + target = Url("akamhy.github.io", user_agent) assert len(target.known_urls()) > 3 diff --git a/waybackpy/cdx.py b/waybackpy/cdx.py new file mode 100644 index 0000000..ccef8b9 --- /dev/null +++ b/waybackpy/cdx.py @@ -0,0 +1,234 @@ +from .snapshot import CdxSnapshot +from .exceptions import WaybackError +from .utils import ( + _full_url, + _get_total_pages, + _get_response, + default_user_agent, + _check_filters, + _check_collapses, + _check_match_type, +) + +# TODO : Threading support for pagination API. It's designed for Threading. + + +class Cdx: + def __init__( + self, + url, + user_agent=default_user_agent, + start_timestamp=None, + end_timestamp=None, + filters=[], + match_type=None, + gzip=True, + collapses=[], + limit=10000, + ): + self.url = str(url).strip() + self.user_agent = str(user_agent) + self.start_timestamp = str(start_timestamp) if start_timestamp else None + self.end_timestamp = str(end_timestamp) if end_timestamp else None + self.filters = filters + _check_filters(self.filters) + self.match_type = str(match_type).strip() if match_type else None + _check_match_type(self.match_type, self.url) + self.gzip = gzip + self.collapses = collapses + _check_collapses(self.collapses) + self.limit = limit + self.last_api_request_url = None + self.use_page = False + + def cdx_api_manager(self, payload, headers, use_page=False): + """ + We have two options to get the snapshots, we use this + method to make a selection between pagination API and + the normal one with Resumption Key, sequential querying + of CDX data. For very large querying (for example domain query), + it may be useful to perform queries in parallel and also estimate + the total size of the query. + + read more about the pagination API at: + https://web.archive.org/web/20201228063237/https://github.com/internetarchive/wayback/blob/master/wayback-cdx-server/README.md#pagination-api + + if use_page is false if will use the normal sequential query API, + else use the pagination API. + + two mutually exclusive cases possible: + + 1) pagination API is selected + + a) get the total number of pages to read, using _get_total_pages() + + b) then we use a for loop to get all the pages and yield the response text + + 2) normal sequential query API is selected. + + a) get use showResumeKey=true to ask the API to add a query resumption key + at the bottom of response + + b) check if the page has more than 3 lines, if not return the text + + c) if it has atleast three lines, we check the second last line for zero length. + + d) if the second last line has length zero than we assume that the last line contains + the resumption key, we set the resumeKey and remove the resumeKey from text + + e) if the second line has non zero length we return the text as there will no resumption key + + f) if we find the resumption key we set the "more" variable status to True which is always set + to False on each iteration. If more is not True the iteration stops and function returns. + """ + + endpoint = "https://web.archive.org/cdx/search/cdx" + + if use_page == True: + + total_pages = _get_total_pages(self.url, self.user_agent) + + for i in range(total_pages): + payload["page"] = str(i) + url, res = _get_response( + endpoint, params=payload, headers=headers, return_full_url=True + ) + + self.last_api_request_url = url + + yield res.text + else: + + payload["showResumeKey"] = "true" + payload["limit"] = str(self.limit) + resumeKey = None + + more = True + while more: + + if resumeKey: + payload["resumeKey"] = resumeKey + + url, res = _get_response( + endpoint, params=payload, headers=headers, return_full_url=True + ) + + self.last_api_request_url = url + + text = res.text.strip() + lines = text.splitlines() + + more = False + + if len(lines) >= 3: + + last_line = lines[-1] + second_last_line = lines[-2] + + if len(second_last_line) == 0: + + resumeKey = lines[-1].strip() + text = text.replace(resumeKey, "", 1).strip() + more = True + + yield text + + def snapshots(self): + """ + This function yeilds snapshots encapsulated + in CdxSnapshot for more usability. + + All the get request values are set if the conditions match + + And we use logic that if someone's only inputs don't have any + of [start_timestamp, end_timestamp] and don't use any collapses + then we use the pagination API as it returns archives starting + from the first archive and the recent most archive will be on + the last page. + """ + payload = {} + headers = {"User-Agent": self.user_agent} + + if self.start_timestamp: + payload["from"] = self.start_timestamp + + if self.end_timestamp: + payload["to"] = self.end_timestamp + + if self.gzip != True: + payload["gzip"] = "false" + + if self.match_type: + payload["matchType"] = self.match_type + + if self.filters and len(self.filters) > 0: + for i, f in enumerate(self.filters): + payload["filter" + str(i)] = f + + if self.collapses and len(self.collapses) > 0: + for i, f in enumerate(self.collapses): + payload["collapse" + str(i)] = f + + payload["url"] = self.url + + if not self.start_timestamp or self.end_timestamp: + self.use_page = True + + if self.collapses != []: + self.use_page = False + + texts = self.cdx_api_manager(payload, headers, use_page=self.use_page) + + for text in texts: + + if text.isspace() or len(text) <= 1 or not text: + continue + + snapshot_list = text.split("\n") + + for snapshot in snapshot_list: + + if len(snapshot) < 46: # 14 + 32 (timestamp+digest) + continue + + properties = { + "urlkey": None, + "timestamp": None, + "original": None, + "mimetype": None, + "statuscode": None, + "digest": None, + "length": None, + } + + prop_values = snapshot.split(" ") + + # Making sure that we get the same number of + # property values as the number of properties + prop_values_len = len(prop_values) + properties_len = len(properties) + if prop_values_len != properties_len: + raise WaybackError( + "Snapshot returned by Cdx API has %s properties instead of expected %s properties.\nInvolved Snapshot : %s" + % (prop_values_len, properties_len, snapshot) + ) + + ( + properties["urlkey"], + properties["timestamp"], + properties["original"], + properties["mimetype"], + properties["statuscode"], + properties["digest"], + properties["length"], + ) = prop_values + + yield CdxSnapshot( + properties["urlkey"], + properties["timestamp"], + properties["original"], + properties["mimetype"], + properties["statuscode"], + properties["digest"], + properties["length"], + ) diff --git a/waybackpy/cli.py b/waybackpy/cli.py index 67206e4..1128a44 100644 --- a/waybackpy/cli.py +++ b/waybackpy/cli.py @@ -4,9 +4,9 @@ import sys import random import string import argparse -from waybackpy.wrapper import Url -from waybackpy.exceptions import WaybackError -from waybackpy.__version__ import __version__ +from .wrapper import Url +from .exceptions import WaybackError +from .__version__ import __version__ def _save(obj): @@ -19,11 +19,11 @@ def _save(obj): header = m.group(1) if "No archive URL found in the API response" in e: return ( - "\n[waybackpy] Can not save/archive your link.\n[waybackpy] This\ - could happen because either your waybackpy (%s) is likely out of\ - date or Wayback Machine is malfunctioning.\n[waybackpy] Visit\ - https://github.com/akamhy/waybackpy for the latest version of \ - waybackpy.\n[waybackpy] API response Header :\n%s" + "\n[waybackpy] Can not save/archive your link.\n[waybackpy] This " + "could happen because either your waybackpy (%s) is likely out of " + "date or Wayback Machine is malfunctioning.\n[waybackpy] Visit " + "https://github.com/akamhy/waybackpy for the latest version of " + "waybackpy.\n[waybackpy] API response Header :\n%s" % (__version__, header) ) return WaybackError(err) @@ -108,17 +108,16 @@ def _known_urls(obj, args): """ Known urls for a domain. """ - # sd = subdomain - sd = False + + subdomain = False if args.subdomain: - sd = True + subdomain = True - # al = alive - al = False + alive = False if args.alive: - al = True + alive = True - url_list = obj.known_urls(alive=al, subdomain=sd) + url_list = obj.known_urls(alive=alive, subdomain=subdomain) total_urls = len(url_list) if total_urls > 0: diff --git a/waybackpy/exceptions.py b/waybackpy/exceptions.py index d1f6200..f220d15 100644 --- a/waybackpy/exceptions.py +++ b/waybackpy/exceptions.py @@ -4,6 +4,7 @@ waybackpy.exceptions This module contains the set of Waybackpy's exceptions. """ + class WaybackError(Exception): """ Raised when Wayback Machine API Service is unreachable/down. diff --git a/waybackpy/snapshot.py b/waybackpy/snapshot.py new file mode 100644 index 0000000..d6a2c1e --- /dev/null +++ b/waybackpy/snapshot.py @@ -0,0 +1,26 @@ +from datetime import datetime + + +class CdxSnapshot: + """ + This class helps to handle the Cdx Snapshots easily. + + What the raw data looks like: + org,archive)/ 20080126045828 http://github.com text/html 200 Q4YULN754FHV2U6Q5JUT6Q2P57WEWNNY 1415 + """ + + def __init__( + self, urlkey, timestamp, original, mimetype, statuscode, digest, length + ): + self.urlkey = urlkey + self.timestamp = timestamp + self.datetime_timestamp = datetime.strptime(timestamp, "%Y%m%d%H%M%S") + self.original = original + self.mimetype = mimetype + self.statuscode = statuscode + self.digest = digest + self.length = length + self.archive_url = "https://web.archive.org/web/" + timestamp + "/" + original + + def __str__(self): + return self.archive_url diff --git a/waybackpy/utils.py b/waybackpy/utils.py new file mode 100644 index 0000000..bec77b1 --- /dev/null +++ b/waybackpy/utils.py @@ -0,0 +1,280 @@ +import re +import requests +from .exceptions import WaybackError, URLError +from datetime import datetime + +from urllib3.util.retry import Retry +from requests.adapters import HTTPAdapter +from .__version__ import __version__ + +quote = requests.utils.quote +default_user_agent = "waybackpy python package - https://github.com/akamhy/waybackpy" + + +def _ts(timestamp, data): + """ + Get timestamp of last fetched archive. + If used before fetching any archive, will + use whatever self.JSON returns. + + self.timestamp is None implies that + self.JSON will return any archive's JSON + that wayback machine provides it. + """ + + if timestamp: + return timestamp + + if not data["archived_snapshots"]: + return datetime.max + + return datetime.strptime( + data["archived_snapshots"]["closest"]["timestamp"], "%Y%m%d%H%M%S" + ) + + +def _check_match_type(match_type, url): + if not match_type: + return + + if "*" in url: + raise WaybackError("Can not use wildcard with match_type argument") + + legal_match_type = ["exact", "prefix", "host", "domain"] + + if match_type not in legal_match_type: + raise WaybackError( + "%s is not an allowed match type.\nUse one from 'exact', 'prefix', 'host' or 'domain'" + % match_type + ) + + +def _check_collapses(collapses): + + if not isinstance(collapses, list): + raise WaybackError("collapses must be a list.") + + if len(collapses) == 0: + return + + for c in collapses: + try: + match = re.search( + r"(urlkey|timestamp|original|mimetype|statuscode|digest|length)(:?[0-9]{1,99})?", + c, + ) + field = match.group(1) + + N = None + if 2 == len(match.groups()): + N = match.group(2) + + if N: + assert field + N == c + else: + assert field == c + + except Exception: + e = "collapse argument '%s' is not following the cdx collapse syntax." % c + raise WaybackError(e) + + +def _check_filters(filters): + if not isinstance(filters, list): + raise WaybackError("filters must be a list.") + + # [!]field:regex + for f in filters: + try: + match = re.search( + r"(\!?(?:urlkey|timestamp|original|mimetype|statuscode|digest|length)):(.*)", + f, + ) + + key = match.group(1) + val = match.group(2) + + if "statuscode" in key: + assert len(val) == 3 + assert isinstance(int(val), int) == True + + if "timestamp" in key: + + int_ts = int(val) + assert len(val) == 14 # must be 14 and not less to filter + assert int_ts > 19_950_000_000_000 # year 1995, 14 digit ts + assert isinstance(int_ts, int) == True + + if "original" in key: + assert "http" in val + + except Exception as e: + e = "Filter '%s' not following the cdx filter syntax." % f + raise WaybackError(e) + + +def _cleaned_url(url): + print(1) + """ + Remove EOL + replace " " with "_" + """ + return str(url).strip().replace(" ", "%20") + + +def _url_check(url): + """ + Check for common URL problems. + What we are checking: + 1) '.' in self.url, no url that ain't '.' in it. + + If you known any others, please create a PR on the github repo. + """ + + if "." not in url: + raise URLError("'%s' is not a vaild URL." % url) + + +def _full_url(endpoint, params): + full_url = endpoint + if params: + full_url = endpoint if endpoint.endswith("?") else (endpoint + "?") + for key, val in params.items(): + key = "filter" if key.startswith("filter") else key + key = "collapse" if key.startswith("collapse") else key + amp = "" if full_url.endswith("?") else "&" + full_url = full_url + amp + "%s=%s" % (key, quote(str(val))) + return full_url + + +def _get_total_pages(url, user_agent): + """ + If showNumPages is passed in cdx API, it returns + 'number of archive pages'and each page has many archives. + + This func returns number of pages of archives (type int). + """ + total_pages_url = ( + "https://web.archive.org/cdx/search/cdx?url=%s&showNumPages=true" % url + ) + headers = {"User-Agent": user_agent} + return int((_get_response(total_pages_url, headers=headers).text).strip()) + + +def _archive_url_parser(header, url): + """ + The wayback machine's save API doesn't + return JSON response, we are required + to read the header of the API response + and look for the archive URL. + + This method has some regexen (or regexes) + that search for archive url in header. + + This method is used when you try to + save a webpage on wayback machine. + + Two cases are possible: + 1) Either we find the archive url in + the header. + + 2) Or we didn't find the archive url in + API header. + + If we found the archive URL we return it. + + And if we couldn't find it, we raise + WaybackError with an error message. + """ + + # Regex1 + m = re.search(r"Content-Location: (/web/[0-9]{14}/.*)", str(header)) + if m: + return "web.archive.org" + m.group(1) + + # Regex2 + m = re.search( + r"rel=\"memento.*?(web\.archive\.org/web/[0-9]{14}/.*?)>", str(header) + ) + if m: + return m.group(1) + + # Regex3 + m = re.search(r"X-Cache-Key:\shttps(.*)[A-Z]{2}", str(header)) + if m: + return m.group(1) + + raise WaybackError( + "No archive URL found in the API response. " + "If '%s' can be accessed via your web browser then either " + "this version of waybackpy (%s) is out of date or WayBack Machine is malfunctioning. Visit " + "'https://github.com/akamhy/waybackpy' for the latest version " + "of waybackpy.\nHeader:\n%s" % (url, __version__, str(header)) + ) + + +def _wayback_timestamp(**kwargs): + """ + Wayback Machine archive URLs + have a timestamp in them. + + The standard archive URL format is + https://web.archive.org/web/20191214041711/https://www.youtube.com + + If we break it down in three parts: + 1 ) The start (https://web.archive.org/web/) + 2 ) timestamp (20191214041711) + 3 ) https://www.youtube.com, the original URL + + The near method takes year, month, day, hour and minute + as Arguments, their type is int. + + This method takes those integers and converts it to + wayback machine timestamp and returns it. + + Return format is string. + """ + + return "".join( + str(kwargs[key]).zfill(2) for key in ["year", "month", "day", "hour", "minute"] + ) + + +def _get_response( + endpoint, params=None, headers=None, retries=5, return_full_url=False +): + """ + This function is used make get request. + We use the requests package to make the + requests. + + + We try five times and if it fails it raises + WaybackError exception. + + You can handles WaybackError by importing: + from waybackpy.exceptions import WaybackError + + try: + ... + except WaybackError as e: + # handle it + """ + + # From https://stackoverflow.com/a/35504626 + # By https://stackoverflow.com/users/401467/datashaman + s = requests.Session() + retries = Retry( + total=retries, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504] + ) + s.mount("https://", HTTPAdapter(max_retries=retries)) + url = _full_url(endpoint, params) + print(url) + try: + if not return_full_url: + return s.get(url, headers=headers) + return (url, s.get(url, headers=headers)) + except Exception as e: + exc = WaybackError("Error while retrieving %s" % url) + exc.__cause__ = e + raise exc diff --git a/waybackpy/wrapper.py b/waybackpy/wrapper.py index 0f9c2b2..e12e34c 100644 --- a/waybackpy/wrapper.py +++ b/waybackpy/wrapper.py @@ -1,151 +1,24 @@ -import re import requests import concurrent.futures -from urllib3.util.retry import Retry from datetime import datetime, timedelta -from requests.adapters import HTTPAdapter -from waybackpy.__version__ import __version__ -from waybackpy.exceptions import WaybackError, URLError - - -default_user_agent = "waybackpy python package - https://github.com/akamhy/waybackpy" - - -def _get_total_pages(url, user_agent): - """ - If showNumPages is passed in cdx API, it returns - 'number of archive pages'and each page has many archives. - - This func returns number of pages of archives (type int). - """ - total_pages_url = ( - "https://web.archive.org/cdx/search/cdx?url=%s&showNumPages=true" % url - ) - headers = {"User-Agent": user_agent} - return int((_get_response(total_pages_url, headers=headers).text).strip()) - - -def _archive_url_parser(header, url): - """ - The wayback machine's save API doesn't - return JSON response, we are required - to read the header of the API response - and look for the archive URL. - - This method has some regexen (or regexes) - that search for archive url in header. - - This method is used when you try to - save a webpage on wayback machine. - - Two cases are possible: - 1) Either we find the archive url in - the header. - - 2) Or we didn't find the archive url in - API header. - - If we found the archive URL we return it. - - And if we couldn't find it, we raise - WaybackError with an error message. - """ - - # Regex1 - m = re.search(r"Content-Location: (/web/[0-9]{14}/.*)", str(header)) - if m: - return "web.archive.org" + m.group(1) - - # Regex2 - m = re.search( - r"rel=\"memento.*?(web\.archive\.org/web/[0-9]{14}/.*?)>", str(header) - ) - if m: - return m.group(1) - - # Regex3 - m = re.search(r"X-Cache-Key:\shttps(.*)[A-Z]{2}", str(header)) - if m: - return m.group(1) - - raise WaybackError( - "No archive URL found in the API response. " - "If '%s' can be accessed via your web browser then either " - "this version of waybackpy (%s) is out of date or WayBack Machine is malfunctioning. Visit " - "'https://github.com/akamhy/waybackpy' for the latest version " - "of waybackpy.\nHeader:\n%s" % (url, __version__, str(header)) - ) - - -def _wayback_timestamp(**kwargs): - """ - Wayback Machine archive URLs - have a timestamp in them. - - The standard archive URL format is - https://web.archive.org/web/20191214041711/https://www.youtube.com - - If we break it down in three parts: - 1 ) The start (https://web.archive.org/web/) - 2 ) timestamp (20191214041711) - 3 ) https://www.youtube.com, the original URL - - The near method takes year, month, day, hour and minute - as Arguments, their type is int. - - This method takes those integers and converts it to - wayback machine timestamp and returns it. - - Return format is string. - """ - - return "".join( - str(kwargs[key]).zfill(2) for key in ["year", "month", "day", "hour", "minute"] - ) - - -def _get_response(endpoint, params=None, headers=None, retries=5): - """ - This function is used make get request. - We use the requests package to make the - requests. - - - We try five times and if it fails it raises - WaybackError exception. - - You can handles WaybackError by importing: - from waybackpy.exceptions import WaybackError - - try: - ... - except WaybackError as e: - # handle it - """ - - # From https://stackoverflow.com/a/35504626 - # By https://stackoverflow.com/users/401467/datashaman - s = requests.Session() - retries = Retry(total=retries, backoff_factor=0.5, status_forcelist=[ 500, 502, 503, 504 ]) - s.mount('https://', HTTPAdapter(max_retries=retries)) - - try: - return s.get(endpoint, params=params, headers=headers) - except Exception as e: - exc = WaybackError("Error while retrieving %s" % endpoint) - exc.__cause__ = e - raise exc +from .exceptions import WaybackError +from .cdx import Cdx +from .utils import ( + _archive_url_parser, + _wayback_timestamp, + _get_response, + default_user_agent, + _url_check, + _cleaned_url, + _ts, +) class Url: - """ - waybackpy Url class, Type : - """ - def __init__(self, url, user_agent=default_user_agent): self.url = url self.user_agent = str(user_agent) - self._url_check() + _url_check(self.url) self._archive_url = None self.timestamp = None self._JSON = None @@ -197,18 +70,6 @@ class Url: return (datetime.utcnow() - self.timestamp).days - def _url_check(self): - """ - Check for common URL problems. - What we are checking: - 1) '.' in self.url, no url that ain't '.' in it. - - If you known any others, please create a PR on the github repo. - """ - - if "." not in self.url: - raise URLError("'%s' is not a vaild URL." % self.url) - @property def JSON(self): """ @@ -225,7 +86,7 @@ class Url: endpoint = "https://archive.org/wayback/available" headers = {"User-Agent": self.user_agent} - payload = {"url": "%s" % self._cleaned_url()} + payload = {"url": "%s" % _cleaned_url(self.url)} response = _get_response(endpoint, params=payload, headers=headers) return response.json() @@ -256,37 +117,8 @@ class Url: @property def _timestamp(self): - """ - Get timestamp of last fetched archive. - If used before fetching any archive, will - use whatever self.JSON returns. - - self.timestamp is None implies that - self.JSON will return any archive's JSON - that wayback machine provides it. - """ - - if self.timestamp: - return self.timestamp - - data = self.JSON - - if not data["archived_snapshots"]: - ts = datetime.max - - else: - ts = datetime.strptime( - data["archived_snapshots"]["closest"]["timestamp"], "%Y%m%d%H%M%S" - ) - self.timestamp = ts - return ts - - def _cleaned_url(self): - """ - Remove EOL - replace " " with "_" - """ - return str(self.url).strip().replace(" ", "_") + self.timestamp = _ts(self.timestamp, self.JSON) + return self.timestamp def save(self): """ @@ -302,7 +134,7 @@ class Url: _archive_url_parser() parses the archive from the header. """ - request_url = "https://web.archive.org/save/" + self._cleaned_url() + request_url = "https://web.archive.org/save/" + _cleaned_url(self.url) headers = {"User-Agent": self.user_agent} response = _get_response(request_url, params=None, headers=headers) self._archive_url = "https://" + _archive_url_parser(response.headers, self.url) @@ -317,7 +149,7 @@ class Url: """ if not url: - url = self._cleaned_url() + url = _cleaned_url(self.url) if not user_agent: user_agent = self.user_agent @@ -366,14 +198,15 @@ class Url: endpoint = "https://archive.org/wayback/available" headers = {"User-Agent": self.user_agent} - payload = {"url": "%s" % self._cleaned_url(), "timestamp": timestamp} + payload = {"url": "%s" % _cleaned_url(self.url), "timestamp": timestamp} response = _get_response(endpoint, params=payload, headers=headers) data = response.json() if not data["archived_snapshots"]: raise WaybackError( "Can not find archive for '%s' try later or use wayback.Url(url, user_agent).save() " - "to create a new archive." % self._cleaned_url() + "to create a new archive.\nAPI response:\n%s" + % (_cleaned_url(self.url), response.text) ) archive_url = data["archived_snapshots"]["closest"]["url"] archive_url = archive_url.replace( @@ -423,17 +256,17 @@ class Url: """ cdx = Cdx( - self._cleaned_url(), + _cleaned_url(self.url), user_agent=self.user_agent, start_timestamp=start_timestamp, end_timestamp=end_timestamp, ) i = 0 for _ in cdx.snapshots(): - i += 1 + i = i + 1 return i - def live_urls_picker(self, url): + def live_urls_finder(self, url): """ This method is used to check if supplied url is >= 400. @@ -465,9 +298,9 @@ class Url: url_list = [] if subdomain: - url = "*.%s/*" % self._cleaned_url() + url = "*.%s/*" % _cleaned_url(self.url) else: - url = "%s/*" % self._cleaned_url() + url = "%s/*" % _cleaned_url(self.url) cdx = Cdx( url, @@ -486,99 +319,7 @@ class Url: # Remove all deadURLs from url_list if alive=True if alive: with concurrent.futures.ThreadPoolExecutor() as executor: - executor.map(self.live_urls_picker, url_list) + executor.map(self.live_urls_finder, url_list) url_list = self._alive_url_list return url_list - - -class CdxSnapshot: - """ - This class helps to handle the Cdx Snapshots easily. - - What the raw data looks like: - org,archive)/ 20080126045828 http://github.com text/html 200 Q4YULN754FHV2U6Q5JUT6Q2P57WEWNNY 1415 - """ - - def __init__( - self, urlkey, timestamp, original, mimetype, statuscode, digest, length - ): - self.urlkey = urlkey # Useless - self.timestamp = timestamp - self.original = original - self.mimetype = mimetype - self.statuscode = statuscode - self.digest = digest - self.length = length - self.archive_url = "https://web.archive.org/web/%s/%s" % ( - self.timestamp, - self.original, - ) - - def __str__(self): - return self.archive_url - - -class Cdx: - """ - waybackpy Cdx class, Type : - - Cdx keys are : - urlkey - timestamp - original - mimetype - statuscode - digest - length - """ - - def __init__( - self, - url, - user_agent=default_user_agent, - start_timestamp=None, - end_timestamp=None, - ): - self.url = url - self.user_agent = str(user_agent) - self.start_timestamp = str(start_timestamp) if start_timestamp else None - self.end_timestamp = str(end_timestamp) if end_timestamp else None - - def snapshots(self): - """ - This function yeilds snapshots encapsulated - in CdxSnapshot for more usability. - """ - payload = {} - endpoint = "https://web.archive.org/cdx/search/cdx" - total_pages = _get_total_pages(self.url, self.user_agent) - headers = {"User-Agent": self.user_agent} - if self.start_timestamp: - payload["from"] = self.start_timestamp - if self.end_timestamp: - payload["to"] = self.end_timestamp - payload["url"] = self.url - - for i in range(total_pages): - payload["page"] = str(i) - res = _get_response(endpoint, params=payload, headers=headers) - text = res.text - if text.isspace() or len(text) <= 1 or not text: - break - snapshot_list = text.split("\n") - for snapshot in snapshot_list: - if len(snapshot) < 15: - continue - ( - urlkey, - timestamp, - original, - mimetype, - statuscode, - digest, - length, - ) = snapshot.split(" ") - yield CdxSnapshot( - urlkey, timestamp, original, mimetype, statuscode, digest, length - )