fix: cli error and refactor codes

This commit is contained in:
eggplants
2022-02-05 05:14:48 +09:00
parent c921d62e6c
commit 4df8364a1a
2 changed files with 87 additions and 109 deletions

View File

@@ -16,6 +16,52 @@ from .utils import DEFAULT_USER_AGENT
from .wrapper import Url from .wrapper import Url
def echo_availability_api(
availability_api_instance: WaybackMachineAvailabilityAPI, json: bool
) -> None:
click.echo("Archive URL:")
if not availability_api_instance.archive_url:
archive_url = (
"NO ARCHIVE FOUND - The requested URL is probably "
+ "not yet archived or if the URL was recently archived then it is "
+ "not yet available via the Wayback Machine's availability API "
+ "because of database lag and should be available after some time."
)
else:
archive_url = availability_api_instance.archive_url
click.echo(archive_url)
if json:
click.echo("JSON response:")
click.echo(JSON.dumps(availability_api_instance.JSON))
def save_urls_on_file(url_gen: Generator[str, None, None]) -> None:
domain = None
sys_random = random.SystemRandom()
uid = "".join(
sys_random.choice(string.ascii_lowercase + string.digits) for _ in range(6)
)
url_count = 0
file_name = None
for url in url_gen:
url_count += 1
if not domain:
match = re.search("https?://([A-Za-z_0-9.-]+).*", url)
domain = "domain-unknown" if match is None else match.group(1)
file_name = f"{domain}-urls-{uid}.txt"
file_path = os.path.join(os.getcwd(), file_name)
with open(file_path, "a") as f:
f.write(f"{url}\n")
click.echo(url)
if url_count > 0 or file_name is not None:
click.echo(f"\n\n'{file_name}' saved in current working directory")
else:
click.echo("No known URLs found. Please try a diffrent input!")
@click.command() @click.command()
@click.option( @click.option(
"-u", "--url", help="URL on which Wayback machine operations are to be performed." "-u", "--url", help="URL on which Wayback machine operations are to be performed."
@@ -30,9 +76,9 @@ from .wrapper import Url
@click.option("-v", "--version", is_flag=True, default=False, help="waybackpy version.") @click.option("-v", "--version", is_flag=True, default=False, help="waybackpy version.")
@click.option( @click.option(
"-l", "-l",
"--license",
"--show-license", "--show-license",
"--show_license", "--show_license",
"--license",
is_flag=True, is_flag=True,
default=False, default=False,
help="Show license of Waybackpy.", help="Show license of Waybackpy.",
@@ -130,10 +176,9 @@ from .wrapper import Url
) )
@click.option( @click.option(
"-f", "-f",
"--cdxfilter",
"--filter",
"--cdx-filter", "--cdx-filter",
"--cdx_filter", "--cdx_filter",
"--filter",
multiple=True, multiple=True,
help="Filter on a specific field or all the CDX fields.", help="Filter on a specific field or all the CDX fields.",
) )
@@ -222,26 +267,18 @@ def main( # pylint: disable=no-value-for-parameter
Released under the MIT License. Use the flag --license for license. Released under the MIT License. Use the flag --license for license.
""" """
if version: if version:
click.echo(f"waybackpy version {__version__}") click.echo(f"waybackpy version {__version__}")
return elif show_license:
if show_license:
click.echo( click.echo(
requests.get( requests.get(
url="https://raw.githubusercontent.com/akamhy/waybackpy/master/LICENSE" url="https://raw.githubusercontent.com/akamhy/waybackpy/master/LICENSE"
).text ).text
) )
return elif url is None:
if not url:
click.echo("No URL detected. Please provide an URL.") click.echo("No URL detected. Please provide an URL.")
return elif (
not version
if (
url
and not version
and not oldest and not oldest
and not newest and not newest
and not near and not near
@@ -253,39 +290,16 @@ def main( # pylint: disable=no-value-for-parameter
"Only URL passed, but did not specify what to do with the URL. " "Only URL passed, but did not specify what to do with the URL. "
"Use --help flag for help using waybackpy." "Use --help flag for help using waybackpy."
) )
return elif oldest:
availability_api = WaybackMachineAvailabilityAPI(url, user_agent=user_agent)
def echo_availability_api(
availability_api_instance: WaybackMachineAvailabilityAPI,
) -> None:
click.echo("Archive URL:")
if not availability_api_instance.archive_url:
archive_url = (
"NO ARCHIVE FOUND - The requested URL is probably "
+ "not yet archived or if the URL was recently archived then it is "
+ "not yet available via the Wayback Machine's availability API "
+ "because of database lag and should be available after some time."
)
else:
archive_url = availability_api_instance.archive_url
click.echo(archive_url)
if json:
click.echo("JSON response:")
click.echo(JSON.dumps(availability_api_instance.JSON))
availability_api = WaybackMachineAvailabilityAPI(url, user_agent=user_agent)
if oldest:
availability_api.oldest() availability_api.oldest()
echo_availability_api(availability_api) echo_availability_api(availability_api, json)
return elif newest:
availability_api = WaybackMachineAvailabilityAPI(url, user_agent=user_agent)
if newest:
availability_api.newest() availability_api.newest()
echo_availability_api(availability_api) echo_availability_api(availability_api, json)
return elif near:
availability_api = WaybackMachineAvailabilityAPI(url, user_agent=user_agent)
if near:
near_args = {} near_args = {}
keys = ["year", "month", "day", "hour", "minute"] keys = ["year", "month", "day", "hour", "minute"]
args_arr = [year, month, day, hour, minute] args_arr = [year, month, day, hour, minute]
@@ -293,10 +307,8 @@ def main( # pylint: disable=no-value-for-parameter
if arg: if arg:
near_args[key] = arg near_args[key] = arg
availability_api.near(**near_args) availability_api.near(**near_args)
echo_availability_api(availability_api) echo_availability_api(availability_api, json)
return elif save:
if save:
save_api = WaybackMachineSaveAPI(url, user_agent=user_agent) save_api = WaybackMachineSaveAPI(url, user_agent=user_agent)
save_api.save() save_api.save()
click.echo("Archive URL:") click.echo("Archive URL:")
@@ -306,43 +318,7 @@ def main( # pylint: disable=no-value-for-parameter
if headers: if headers:
click.echo("Save API headers:") click.echo("Save API headers:")
click.echo(save_api.headers) click.echo(save_api.headers)
return elif known_urls:
def save_urls_on_file(url_gen: Generator[str, None, None]) -> None:
domain = None
sys_random = random.SystemRandom()
uid = "".join(
sys_random.choice(string.ascii_lowercase + string.digits) for _ in range(6)
)
url_count = 0
file_name = None
for url in url_gen:
url_count += 1
if not domain:
match = re.search("https?://([A-Za-z_0-9.-]+).*", url)
domain = "domain-unknown"
if match is not None:
domain = match.group(1)
file_name = f"{domain}-urls-{uid}.txt"
file_path = os.path.join(os.getcwd(), file_name)
if not os.path.isfile(file_path):
open(file_path, "w+").close()
with open(file_path, "a") as f:
f.write(f"{url}\n")
click.echo(url)
if url_count > 0 or file_name is not None:
click.echo(f"\n\n'{file_name}' saved in current working directory")
else:
click.echo("No known URLs found. Please try a diffrent input!")
if known_urls:
wayback = Url(url, user_agent) wayback = Url(url, user_agent)
url_gen = wayback.known_urls(subdomain=subdomain) url_gen = wayback.known_urls(subdomain=subdomain)
@@ -351,8 +327,7 @@ def main( # pylint: disable=no-value-for-parameter
else: else:
for url in url_gen: for url in url_gen:
click.echo(url) click.echo(url)
elif cdx:
if cdx:
filters = list(cdx_filter) filters = list(cdx_filter)
collapses = list(collapse) collapses = list(collapse)
cdx_print = list(cdx_print) cdx_print = list(cdx_print)
@@ -375,35 +350,36 @@ def main( # pylint: disable=no-value-for-parameter
if len(cdx_print) == 0: if len(cdx_print) == 0:
click.echo(snapshot) click.echo(snapshot)
else: else:
output_string = "" output_string = []
if any(val in cdx_print for val in ["urlkey", "url-key", "url_key"]): if any(val in cdx_print for val in ["urlkey", "url-key", "url_key"]):
output_string = output_string + snapshot.urlkey + " " output_string.append(snapshot.urlkey)
if any( if any(
val in cdx_print val in cdx_print
for val in ["timestamp", "time-stamp", "time_stamp"] for val in ["timestamp", "time-stamp", "time_stamp"]
): ):
output_string = output_string + snapshot.timestamp + " " output_string.append(snapshot.timestamp)
if "original" in cdx_print: if "original" in cdx_print:
output_string = output_string + snapshot.original + " " output_string.append(snapshot.original)
if any( if any(
val in cdx_print for val in ["mimetype", "mime-type", "mime_type"] val in cdx_print for val in ["mimetype", "mime-type", "mime_type"]
): ):
output_string = output_string + snapshot.mimetype + " " output_string.append(snapshot.mimetype)
if any( if any(
val in cdx_print val in cdx_print
for val in ["statuscode", "status-code", "status_code"] for val in ["statuscode", "status-code", "status_code"]
): ):
output_string = output_string + snapshot.statuscode + " " output_string.append(snapshot.statuscode)
if "digest" in cdx_print: if "digest" in cdx_print:
output_string = output_string + snapshot.digest + " " output_string.append(snapshot.digest)
if "length" in cdx_print: if "length" in cdx_print:
output_string = output_string + snapshot.length + " " output_string.append(snapshot.length)
if any( if any(
val in cdx_print val in cdx_print
for val in ["archiveurl", "archive-url", "archive_url"] for val in ["archiveurl", "archive-url", "archive_url"]
): ):
output_string = output_string + snapshot.archive_url + " " output_string.append(snapshot.archive_url)
click.echo(output_string)
click.echo(" ".join(output_string))
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -19,7 +19,10 @@ class WaybackMachineSaveAPI(object):
""" """
def __init__( def __init__(
self, url: str, user_agent: str = DEFAULT_USER_AGENT, max_tries: int = 8 self,
url: str,
user_agent: str = DEFAULT_USER_AGENT,
max_tries: int = 8,
) -> None: ) -> None:
self.url = str(url).strip().replace(" ", "%20") self.url = str(url).strip().replace(" ", "%20")
self.request_url = "https://web.archive.org/save/" + self.url self.request_url = "https://web.archive.org/save/" + self.url
@@ -169,17 +172,16 @@ class WaybackMachineSaveAPI(object):
tries = 0 tries = 0
while True: while True:
if self.saved_archive is None: if tries >= 1:
if tries >= 1: self.sleep(tries)
self.sleep(tries)
self.get_save_request_headers() self.get_save_request_headers()
self.saved_archive = self.archive_url_parser() self.saved_archive = self.archive_url_parser()
if isinstance(self.saved_archive, str): if isinstance(self.saved_archive, str):
self._archive_url = self.saved_archive self._archive_url = self.saved_archive
self.timestamp() self.timestamp()
return self.saved_archive return self.saved_archive
tries += 1 tries += 1
if tries >= self.max_tries: if tries >= self.max_tries: