From 4b218d35cb0b768e193cd479b1b0ae5b05e82eb9 Mon Sep 17 00:00:00 2001 From: Akash Mahanty Date: Fri, 18 Feb 2022 13:17:40 +0530 Subject: [PATCH] Cdx based oldest newest and near (#159) * implement oldest newest and near methods in the cdx interface class, now cli uses the cdx methods instead of availablity api methods. * handle the closest parameter derivative methods more efficiently and also handle exceptions gracefully. * update test code --- tests/test_cdx_api.py | 86 ++++++++++++++++++++++++++++++++ tests/test_cdx_snapshot.py | 1 + tests/test_cli.py | 35 +------------ waybackpy/availability_api.py | 29 +++-------- waybackpy/cdx_api.py | 75 ++++++++++++++++++++++++++-- waybackpy/cdx_snapshot.py | 6 +++ waybackpy/cli.py | 94 +++++++++++++++++------------------ waybackpy/exceptions.py | 8 +++ waybackpy/utils.py | 20 ++++++++ 9 files changed, 248 insertions(+), 106 deletions(-) diff --git a/tests/test_cdx_api.py b/tests/test_cdx_api.py index b7f28c2..ba2db5a 100644 --- a/tests/test_cdx_api.py +++ b/tests/test_cdx_api.py @@ -1,4 +1,16 @@ +import random +import string + +import pytest + from waybackpy.cdx_api import WaybackMachineCDXServerAPI +from waybackpy.exceptions import NoCDXRecordFound + + +def rndstr(n: int) -> str: + return "".join( + random.choice(string.ascii_uppercase + string.digits) for _ in range(n) + ) def test_a() -> None: @@ -90,3 +102,77 @@ def test_d() -> None: count += 1 assert str(snapshot.archive_url).find("akamhy.github.io") assert count > 50 + + +def test_oldest() -> None: + user_agent = ( + "Mozilla/5.0 (MacBook Air; M1 Mac OS X 11_4) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/604.1" + ) + + cdx = WaybackMachineCDXServerAPI( + url="google.com", + user_agent=user_agent, + filters=["statuscode:200"], + ) + oldest = cdx.oldest() + assert "1998" in oldest.timestamp + assert "google" in oldest.urlkey + assert oldest.original.find("google.com") != -1 + assert oldest.archive_url.find("google.com") != -1 + + +def test_newest() -> None: + user_agent = ( + "Mozilla/5.0 (MacBook Air; M1 Mac OS X 11_4) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/604.1" + ) + + cdx = WaybackMachineCDXServerAPI( + url="google.com", + user_agent=user_agent, + filters=["statuscode:200"], + ) + newest = cdx.newest() + assert "google" in newest.urlkey + assert newest.original.find("google.com") != -1 + assert newest.archive_url.find("google.com") != -1 + + +def test_near() -> None: + user_agent = ( + "Mozilla/5.0 (MacBook Air; M1 Mac OS X 11_4) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/604.1" + ) + + cdx = WaybackMachineCDXServerAPI( + url="google.com", + user_agent=user_agent, + filters=["statuscode:200"], + ) + near = cdx.near(year=2010, month=10, day=10, hour=10, minute=10) + assert "2010101010" in near.timestamp + assert "google" in near.urlkey + assert near.original.find("google.com") != -1 + assert near.archive_url.find("google.com") != -1 + + near = cdx.near(wayback_machine_timestamp="201010101010") + assert "2010101010" in near.timestamp + assert "google" in near.urlkey + assert near.original.find("google.com") != -1 + assert near.archive_url.find("google.com") != -1 + + near = cdx.near(unix_timestamp=1286705410) + assert "2010101010" in near.timestamp + assert "google" in near.urlkey + assert near.original.find("google.com") != -1 + assert near.archive_url.find("google.com") != -1 + + with pytest.raises(NoCDXRecordFound): + dne_url = f"https://{rndstr(30)}.in" + cdx = WaybackMachineCDXServerAPI( + url=dne_url, + user_agent=user_agent, + filters=["statuscode:200"], + ) + cdx.near(unix_timestamp=1286705410) diff --git a/tests/test_cdx_snapshot.py b/tests/test_cdx_snapshot.py index a99977e..922da8a 100644 --- a/tests/test_cdx_snapshot.py +++ b/tests/test_cdx_snapshot.py @@ -41,3 +41,4 @@ def test_CDXSnapshot() -> None: ) assert archive_url == snapshot.archive_url assert sample_input == str(snapshot) + assert sample_input == repr(snapshot) diff --git a/tests/test_cli.py b/tests/test_cli.py index 5a8b033..09335ce 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -42,39 +42,6 @@ def test_near() -> None: ) -def test_json() -> None: - runner = CliRunner() - result = runner.invoke( - main, - [ - "--url", - " https://apple.com ", - "--near", - "--year", - "2010", - "--month", - "2", - "--day", - "8", - "--hour", - "12", - "--json", - ], - ) - assert result.exit_code == 0 - assert ( - result.output.find( - """Archive URL:\nhttps://web.archive.org/web/2010020812\ -5854/http://www.apple.com/\nJSON respons\ -e:\n{"url": "https://apple.com", "archived_snapshots": {"close\ -st": {"status": "200", "available": true, "url": "http://web.ar\ -chive.org/web/20100208125854/http://www.apple.com/", "timest\ -amp": "20100208125854"}}, "timestamp":""" - ) - != -1 - ) - - def test_newest() -> None: runner = CliRunner() result = runner.invoke(main, ["--url", " https://microsoft.com ", "--newest"]) @@ -145,7 +112,7 @@ def test_only_url() -> None: assert result.exit_code == 0 assert ( result.output - == "Only URL passed, but did not specify what to do with the URL. Use \ + == "NoCommandFound: Only URL passed, but did not specify what to do with the URL. Use \ --help flag for help using waybackpy.\n" ) diff --git a/waybackpy/availability_api.py b/waybackpy/availability_api.py index 63e4892..bb4a0f8 100644 --- a/waybackpy/availability_api.py +++ b/waybackpy/availability_api.py @@ -32,7 +32,11 @@ from .exceptions import ( ArchiveNotInAvailabilityAPIResponse, InvalidJSONInAvailabilityAPIResponse, ) -from .utils import DEFAULT_USER_AGENT +from .utils import ( + DEFAULT_USER_AGENT, + unix_timestamp_to_wayback_timestamp, + wayback_timestamp, +) ResponseJSON = Dict[str, Any] @@ -58,14 +62,6 @@ class WaybackMachineAvailabilityAPI: self.json: Optional[ResponseJSON] = None self.response: Optional[Response] = None - @staticmethod - def unix_timestamp_to_wayback_timestamp(unix_timestamp: int) -> str: - """ - Converts Unix time to Wayback Machine timestamp, Wayback Machine - timestamp format is yyyyMMddhhmmss. - """ - return datetime.utcfromtimestamp(int(unix_timestamp)).strftime("%Y%m%d%H%M%S") - def __repr__(self) -> str: """ Same as string representation, just return the archive URL as a string. @@ -194,17 +190,6 @@ class WaybackMachineAvailabilityAPI: ) return archive_url - @staticmethod - def wayback_timestamp(**kwargs: int) -> str: - """ - Prepends zero before the year, month, day, hour and minute so that they - are conformable with the YYYYMMDDhhmmss Wayback Machine timestamp format. - """ - return "".join( - str(kwargs[key]).zfill(2) - for key in ["year", "month", "day", "hour", "minute"] - ) - def oldest(self) -> "WaybackMachineAvailabilityAPI": """ Passes the date 1994-01-01 to near which should return the oldest archive @@ -245,10 +230,10 @@ class WaybackMachineAvailabilityAPI: finally returns the instance. """ if unix_timestamp: - timestamp = self.unix_timestamp_to_wayback_timestamp(unix_timestamp) + timestamp = unix_timestamp_to_wayback_timestamp(unix_timestamp) else: now = datetime.utcnow().timetuple() - timestamp = self.wayback_timestamp( + timestamp = wayback_timestamp( year=now.tm_year if year is None else year, month=now.tm_mon if month is None else month, day=now.tm_mday if day is None else day, diff --git a/waybackpy/cdx_api.py b/waybackpy/cdx_api.py index 7e24db3..6347ad6 100644 --- a/waybackpy/cdx_api.py +++ b/waybackpy/cdx_api.py @@ -9,7 +9,9 @@ the snapshots are yielded as instances of the CDXSnapshot class. """ -from typing import Dict, Generator, List, Optional, cast +import time +from datetime import datetime +from typing import Dict, Generator, List, Optional, Union, cast from .cdx_snapshot import CDXSnapshot from .cdx_utils import ( @@ -21,8 +23,12 @@ from .cdx_utils import ( get_response, get_total_pages, ) -from .exceptions import WaybackError -from .utils import DEFAULT_USER_AGENT +from .exceptions import NoCDXRecordFound, WaybackError +from .utils import ( + DEFAULT_USER_AGENT, + unix_timestamp_to_wayback_timestamp, + wayback_timestamp, +) class WaybackMachineCDXServerAPI: @@ -185,6 +191,69 @@ class WaybackMachineCDXServerAPI: payload["url"] = self.url + def near( + self, + year: Optional[int] = None, + month: Optional[int] = None, + day: Optional[int] = None, + hour: Optional[int] = None, + minute: Optional[int] = None, + unix_timestamp: Optional[int] = None, + wayback_machine_timestamp: Optional[Union[int, str]] = None, + ) -> CDXSnapshot: + """ + Fetch archive close to a datetime, it can only return + a single URL. If you want more do not use this method + instead use the class. + """ + if unix_timestamp: + timestamp = unix_timestamp_to_wayback_timestamp(unix_timestamp) + elif wayback_machine_timestamp: + timestamp = str(wayback_machine_timestamp) + else: + now = datetime.utcnow().timetuple() + timestamp = wayback_timestamp( + year=now.tm_year if year is None else year, + month=now.tm_mon if month is None else month, + day=now.tm_mday if day is None else day, + hour=now.tm_hour if hour is None else hour, + minute=now.tm_min if minute is None else minute, + ) + self.closest = timestamp + self.sort = "closest" + self.limit = 1 + first_snapshot = None + for snapshot in self.snapshots(): + first_snapshot = snapshot + break + + if not first_snapshot: + raise NoCDXRecordFound( + "Wayback Machine's CDX server did not return any records " + + "for the query. The URL may not have any archives " + + " on the Wayback Machine or the URL may have been recently " + + "archived and is still not available on the CDX server." + ) + + return first_snapshot + + def newest(self) -> CDXSnapshot: + """ + Passes the current UNIX time to near() for retrieving the newest archive + from the availability API. + + Remember UNIX time is UTC and Wayback Machine is also UTC based. + """ + return self.near(unix_timestamp=int(time.time())) + + def oldest(self) -> CDXSnapshot: + """ + Passes the date 1994-01-01 to near which should return the oldest archive + because Wayback Machine was started in May, 1996 and it is assumed that + there would be no archive older than January 1, 1994. + """ + return self.near(year=1994, month=1, day=1) + def snapshots(self) -> Generator[CDXSnapshot, None, None]: """ This function yields the CDX data lines as snapshots. diff --git a/waybackpy/cdx_snapshot.py b/waybackpy/cdx_snapshot.py index 20218c1..c70e87b 100644 --- a/waybackpy/cdx_snapshot.py +++ b/waybackpy/cdx_snapshot.py @@ -73,6 +73,12 @@ class CDXSnapshot: f"https://web.archive.org/web/{self.timestamp}/{self.original}" ) + def __repr__(self) -> str: + """ + Same as __str__() + """ + return str(self) + def __str__(self) -> str: """ The string representation is same as the line returned by the diff --git a/waybackpy/cli.py b/waybackpy/cli.py index c805243..7342fd4 100644 --- a/waybackpy/cli.py +++ b/waybackpy/cli.py @@ -6,47 +6,48 @@ import os import random import re import string -from json import dumps -from typing import Any, Generator, List, Optional +from typing import Any, Dict, Generator, List, Optional import click import requests from . import __version__ -from .availability_api import WaybackMachineAvailabilityAPI from .cdx_api import WaybackMachineCDXServerAPI -from .exceptions import ArchiveNotInAvailabilityAPIResponse +from .exceptions import BlockedSiteError, NoCDXRecordFound from .save_api import WaybackMachineSaveAPI from .utils import DEFAULT_USER_AGENT from .wrapper import Url -def echo_availability_api( - availability_api_instance: WaybackMachineAvailabilityAPI, json: bool +def handle_cdx_closest_derivative_methods( + cdx_api: "WaybackMachineCDXServerAPI", + oldest: bool, + near: bool, + newest: bool, + near_args: Optional[Dict[str, int]] = None, ) -> None: """ - Output for method that use the availability API. - Near, oldest and newest output via this function. + Handles the closest parameter derivative methods. + + near, newest and oldest use the closest parameter with active + closest based sorting. """ try: - if availability_api_instance.archive_url: - archive_url = availability_api_instance.archive_url - except ArchiveNotInAvailabilityAPIResponse as error: - message = ( - "NO ARCHIVE FOUND - The requested URL is probably " - + "not yet archived or if the URL was recently archived then it is " - + "not yet available via the Wayback Machine's availability API " - + "because of database lag and should be available after some time." - ) - - click.echo(message + "\nJSON response:\n" + str(error), err=True) - return - - click.echo("Archive URL:") - click.echo(archive_url) - if json: - click.echo("JSON response:") - click.echo(dumps(availability_api_instance.json)) + if near: + if near_args: + archive_url = cdx_api.near(**near_args).archive_url + else: + archive_url = cdx_api.near().archive_url + elif newest: + archive_url = cdx_api.newest().archive_url + elif oldest: + archive_url = cdx_api.oldest().archive_url + click.echo("Archive URL:") + click.echo(archive_url) + except NoCDXRecordFound as exc: + click.echo(click.style("NoCDXRecordFound: ", fg="red") + str(exc), err=True) + except BlockedSiteError as exc: + click.echo(click.style("BlockedSiteError: ", fg="red") + str(exc), err=True) def handle_cdx(data: List[Any]) -> None: @@ -145,7 +146,8 @@ def save_urls_on_file(url_gen: Generator[str, None, None]) -> None: file_name = f"{domain}-urls-{uid}.txt" file_path = os.path.join(os.getcwd(), file_name) if not os.path.isfile(file_path): - open(file_path, "w+", encoding="utf-8").close() + with open(file_path, "w+", encoding="utf-8") as file: + file.close() with open(file_path, "a", encoding="utf-8") as file: file.write(f"{url}\n") @@ -199,13 +201,6 @@ def save_urls_on_file(url_gen: Generator[str, None, None]) -> None: is_flag=True, help="Retrieve the oldest archive of URL.", ) -@click.option( - "-j", - "--json", - default=False, - is_flag=True, - help="JSON data returned by the availability API.", -) @click.option( "-N", "--near", @@ -343,7 +338,6 @@ def main( # pylint: disable=no-value-for-parameter show_license: bool, newest: bool, oldest: bool, - json: bool, near: bool, save: bool, headers: bool, @@ -400,28 +394,32 @@ def main( # pylint: disable=no-value-for-parameter ).text ) elif url is None: - click.echo("No URL detected. Please provide an URL.", err=True) + click.echo( + click.style("NoURLDetected: ", fg="red") + + "No URL detected. " + + "Please provide an URL.", + err=True, + ) elif oldest: - availability_api = WaybackMachineAvailabilityAPI(url, user_agent=user_agent) - availability_api.oldest() - echo_availability_api(availability_api, json) + cdx_api = WaybackMachineCDXServerAPI(url, user_agent=user_agent) + handle_cdx_closest_derivative_methods(cdx_api, oldest, near, newest) elif newest: - availability_api = WaybackMachineAvailabilityAPI(url, user_agent=user_agent) - availability_api.newest() - echo_availability_api(availability_api, json) + cdx_api = WaybackMachineCDXServerAPI(url, user_agent=user_agent) + handle_cdx_closest_derivative_methods(cdx_api, oldest, near, newest) elif near: - availability_api = WaybackMachineAvailabilityAPI(url, user_agent=user_agent) + cdx_api = WaybackMachineCDXServerAPI(url, user_agent=user_agent) near_args = {} keys = ["year", "month", "day", "hour", "minute"] args_arr = [year, month, day, hour, minute] for key, arg in zip(keys, args_arr): if arg: near_args[key] = arg - availability_api.near(**near_args) - echo_availability_api(availability_api, json) + handle_cdx_closest_derivative_methods( + cdx_api, oldest, near, newest, near_args=near_args + ) elif save: save_api = WaybackMachineSaveAPI(url, user_agent=user_agent) @@ -463,9 +461,11 @@ def main( # pylint: disable=no-value-for-parameter handle_cdx(data) else: + click.echo( - "Only URL passed, but did not specify what to do with the URL. " - "Use --help flag for help using waybackpy.", + click.style("NoCommandFound: ", fg="red") + + "Only URL passed, but did not specify what to do with the URL. " + + "Use --help flag for help using waybackpy.", err=True, ) diff --git a/waybackpy/exceptions.py b/waybackpy/exceptions.py index fb6ad86..e74d180 100644 --- a/waybackpy/exceptions.py +++ b/waybackpy/exceptions.py @@ -16,6 +16,14 @@ class WaybackError(Exception): """ +class NoCDXRecordFound(WaybackError): + """ + No records returned by the CDX server for a query. + Raised when the user invokes near(), newest() or oldest() methods + and there are no archives. + """ + + class BlockedSiteError(WaybackError): """ Raised when the archives for website/URLs that was excluded from Wayback diff --git a/waybackpy/utils.py b/waybackpy/utils.py index 3890a8f..920f0f0 100644 --- a/waybackpy/utils.py +++ b/waybackpy/utils.py @@ -2,8 +2,28 @@ Utility functions and shared variables like DEFAULT_USER_AGENT are here. """ +from datetime import datetime + from . import __version__ DEFAULT_USER_AGENT: str = ( f"waybackpy {__version__} - https://github.com/akamhy/waybackpy" ) + + +def unix_timestamp_to_wayback_timestamp(unix_timestamp: int) -> str: + """ + Converts Unix time to Wayback Machine timestamp, Wayback Machine + timestamp format is yyyyMMddhhmmss. + """ + return datetime.utcfromtimestamp(int(unix_timestamp)).strftime("%Y%m%d%H%M%S") + + +def wayback_timestamp(**kwargs: int) -> str: + """ + Prepends zero before the year, month, day, hour and minute so that they + are conformable with the YYYYMMDDhhmmss Wayback Machine timestamp format. + """ + return "".join( + str(kwargs[key]).zfill(2) for key in ["year", "month", "day", "hour", "minute"] + )