refactoring, try to code complexity

This commit is contained in:
Akash Mahanty 2021-01-04 00:14:38 +05:30
parent 62e5217b9e
commit 5dec4927cd
4 changed files with 158 additions and 155 deletions

View File

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
import sys
import os
import pytest
@ -286,7 +285,7 @@ def test_get():
alive=False,
subdomain=False,
known_urls=False,
get="BullShit",
get="foobar",
)
reply = cli.args_handler(args)
assert "get the source code of the" in str(reply)

View File

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
import sys
import pytest
import random
@ -12,14 +11,15 @@ import waybackpy.wrapper as waybackpy # noqa: E402
user_agent = "Mozilla/5.0 (Windows NT 6.2; rv:20.0) Gecko/20121202 Firefox/20.0"
def test_clean_url():
def test_cleaned_url():
"""No API use"""
test_url = " https://en.wikipedia.org/wiki/Network security "
answer = "https://en.wikipedia.org/wiki/Network_security"
target = waybackpy.Url(test_url, user_agent)
test_result = target._clean_url()
test_result = target._cleaned_url()
assert answer == test_result
def test_dunders():
"""No API use"""
url = "https://en.wikipedia.org/wiki/Network_security"
@ -28,19 +28,23 @@ def test_dunders():
assert "waybackpy.Url(url=%s, user_agent=%s)" % (url, user_agent) == repr(target)
assert "en.wikipedia.org" in str(target)
def test_url_check():
"""No API Use"""
broken_url = "http://wwwgooglecom/"
with pytest.raises(Exception):
waybackpy.Url(broken_url, user_agent)
def test_archive_url_parser():
"""No API Use"""
perfect_header = """
{'Server': 'nginx/1.15.8', 'Date': 'Sat, 02 Jan 2021 09:40:25 GMT', 'Content-Type': 'text/html; charset=UTF-8', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'X-Archive-Orig-Server': 'nginx', 'X-Archive-Orig-Date': 'Sat, 02 Jan 2021 09:40:09 GMT', 'X-Archive-Orig-Transfer-Encoding': 'chunked', 'X-Archive-Orig-Connection': 'keep-alive', 'X-Archive-Orig-Vary': 'Accept-Encoding', 'X-Archive-Orig-Last-Modified': 'Fri, 01 Jan 2021 12:19:00 GMT', 'X-Archive-Orig-Strict-Transport-Security': 'max-age=31536000, max-age=0;', 'X-Archive-Guessed-Content-Type': 'text/html', 'X-Archive-Guessed-Charset': 'utf-8', 'Memento-Datetime': 'Sat, 02 Jan 2021 09:40:09 GMT', 'Link': '<https://www.scribbr.com/citing-sources/et-al/>; rel="original", <https://web.archive.org/web/timemap/link/https://www.scribbr.com/citing-sources/et-al/>; rel="timemap"; type="application/link-format", <https://web.archive.org/web/https://www.scribbr.com/citing-sources/et-al/>; rel="timegate", <https://web.archive.org/web/20200601082911/https://www.scribbr.com/citing-sources/et-al/>; rel="first memento"; datetime="Mon, 01 Jun 2020 08:29:11 GMT", <https://web.archive.org/web/20201126185327/https://www.scribbr.com/citing-sources/et-al/>; rel="prev memento"; datetime="Thu, 26 Nov 2020 18:53:27 GMT", <https://web.archive.org/web/20210102094009/https://www.scribbr.com/citing-sources/et-al/>; rel="memento"; datetime="Sat, 02 Jan 2021 09:40:09 GMT", <https://web.archive.org/web/20210102094009/https://www.scribbr.com/citing-sources/et-al/>; rel="last memento"; datetime="Sat, 02 Jan 2021 09:40:09 GMT"', 'Content-Security-Policy': "default-src 'self' 'unsafe-eval' 'unsafe-inline' data: blob: archive.org web.archive.org analytics.archive.org pragma.archivelab.org", 'X-Archive-Src': 'spn2-20210102092956-wwwb-spn20.us.archive.org-8001.warc.gz', 'Server-Timing': 'captures_list;dur=112.646325, exclusion.robots;dur=0.172010, exclusion.robots.policy;dur=0.158205, RedisCDXSource;dur=2.205932, esindex;dur=0.014647, LoadShardBlock;dur=82.205012, PetaboxLoader3.datanode;dur=70.750239, CDXLines.iter;dur=24.306278, load_resource;dur=26.520179', 'X-App-Server': 'wwwb-app200', 'X-ts': '200', 'X-location': 'All', 'X-Cache-Key': 'httpsweb.archive.org/web/20210102094009/https://www.scribbr.com/citing-sources/et-al/IN', 'X-RL': '0', 'X-Page-Cache': 'MISS', 'X-Archive-Screenname': '0', 'Content-Encoding': 'gzip'}
"""
archive = waybackpy._archive_url_parser(perfect_header)
archive = waybackpy._archive_url_parser(
perfect_header, "https://www.scribbr.com/citing-sources/et-al/"
)
assert "web.archive.org/web/20210102094009" in archive
# The below header should result in Exception
@ -49,7 +53,9 @@ def test_archive_url_parser():
"""
with pytest.raises(Exception):
waybackpy._archive_url_parser(no_archive_header)
waybackpy._archive_url_parser(
no_archive_header, "https://www.scribbr.com/citing-sources/et-al/"
)
def test_save():
@ -173,9 +179,11 @@ def test_get_response():
def test_total_archives():
target = waybackpy.Url(" https://google.com ", user_agent)
assert target.total_archives() > 500000
user_agent = (
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0"
)
target = waybackpy.Url(" https://outlook.com ", user_agent)
assert target.total_archives() > 80000
target = waybackpy.Url(
" https://gaha.e4i3n.m5iai3kip6ied.cima/gahh2718gs/ahkst63t7gad8 ", user_agent

View File

@ -1,13 +1,12 @@
# -*- coding: utf-8 -*-
import sys
import os
import re
import argparse
import string
import sys
import random
import string
import argparse
from waybackpy.wrapper import Url
from waybackpy.__version__ import __version__
from waybackpy.exceptions import WaybackError
from waybackpy.__version__ import __version__
def _save(obj):
@ -38,7 +37,7 @@ def _json(obj):
return obj.JSON
def handle_not_archived_error(e, obj):
def no_archive_handler(e, obj):
m = re.search(r"archive\sfor\s\'(.*?)\'\stry", str(e))
if m:
url = m.group(1)
@ -57,14 +56,14 @@ def _oldest(obj):
try:
return obj.oldest()
except Exception as e:
return handle_not_archived_error(e, obj)
return no_archive_handler(e, obj)
def _newest(obj):
try:
return obj.newest()
except Exception as e:
return handle_not_archived_error(e, obj)
return no_archive_handler(e, obj)
def _total_archives(obj):
@ -83,15 +82,15 @@ def _near(obj, args):
try:
return obj.near(**_near_args)
except Exception as e:
return handle_not_archived_error(e, obj)
return no_archive_handler(e, obj)
def _save_urls_on_file(input_list, live_url_count):
m = re.search("https?://([A-Za-z_0-9.-]+).*", input_list[0])
domain = "domain-unknown"
if m:
domain = m.group(1)
else:
domain = "domain-unknown"
uid = "".join(
random.choice(string.ascii_lowercase + string.digits) for _ in range(6)
@ -106,44 +105,39 @@ def _save_urls_on_file(input_list, live_url_count):
def _known_urls(obj, args):
"""Abbreviations:
sd = subdomain
al = alive
"""
Known urls for a domain.
"""
# sd = subdomain
sd = False
al = False
if args.subdomain:
sd = True
# al = alive
al = False
if args.alive:
al = True
url_list = obj.known_urls(alive=al, subdomain=sd)
total_urls = len(url_list)
if total_urls > 0:
text = _save_urls_on_file(url_list, total_urls)
else:
text = "No known URLs found. Please try a diffrent domain!"
return _save_urls_on_file(url_list, total_urls)
return text
return "No known URLs found. Please try a diffrent domain!"
def _get(obj, args):
if args.get.lower() == "url":
return obj.get()
if args.get.lower() == "archive_url":
return obj.get(obj.archive_url)
if args.get.lower() == "oldest":
return obj.get(obj.oldest())
if args.get.lower() == "latest" or args.get.lower() == "newest":
return obj.get(obj.newest())
if args.get.lower() == "save":
return obj.get(obj.save())
return "Use get as \"--get 'source'\", 'source' can be one of the followings: \
\n1) url - get the source code of the url specified using --url/-u.\
\n2) archive_url - get the source code of the newest archive for the supplied url, alias of newest.\
@ -167,39 +161,48 @@ def args_handler(args):
obj = Url(args.url, args.user_agent)
if args.save:
return _save(obj)
if args.archive_url:
return _archive_url(obj)
if args.json:
return _json(obj)
if args.oldest:
return _oldest(obj)
if args.newest:
return _newest(obj)
if args.known_urls:
return _known_urls(obj, args)
if args.total:
return _total_archives(obj)
if args.near:
output = _save(obj)
elif args.archive_url:
output = _archive_url(obj)
elif args.json:
output = _json(obj)
elif args.oldest:
output = _oldest(obj)
elif args.newest:
output = _newest(obj)
elif args.known_urls:
output = _known_urls(obj, args)
elif args.total:
output = _total_archives(obj)
elif args.near:
return _near(obj, args)
if args.get:
return _get(obj, args)
return (
elif args.get:
output = _get(obj, args)
else:
output = (
"You only specified the URL. But you also need to specify the operation."
"\nSee 'waybackpy --help' for help using this tool."
)
return output
def add_requiredArgs(requiredArgs):
requiredArgs.add_argument(
"--url", "-u", help="URL on which Wayback machine operations would occur"
)
def add_userAgentArg(userAgentArg):
help_text = 'User agent, default user_agent is "waybackpy python package - https://github.com/akamhy/waybackpy"'
userAgentArg.add_argument("--user_agent", "-ua", help=help_text)
def add_saveArg(saveArg):
saveArg.add_argument(
"--save", "-s", action="store_true", help="Save the URL on the Wayback machine"
)
def add_auArg(auArg):
auArg.add_argument(
"--archive_url",
@ -208,6 +211,7 @@ def add_auArg(auArg):
help="Get the latest archive URL, alias for --newest",
)
def add_jsonArg(jsonArg):
jsonArg.add_argument(
"--json",
@ -216,6 +220,7 @@ def add_jsonArg(jsonArg):
help="JSON data of the availability API request",
)
def add_oldestArg(oldestArg):
oldestArg.add_argument(
"--oldest",
@ -224,6 +229,7 @@ def add_oldestArg(oldestArg):
help="Oldest archive for the specified URL",
)
def add_newestArg(newestArg):
newestArg.add_argument(
"--newest",
@ -232,6 +238,7 @@ def add_newestArg(newestArg):
help="Newest archive for the specified URL",
)
def add_totalArg(totalArg):
totalArg.add_argument(
"--total",
@ -240,6 +247,7 @@ def add_totalArg(totalArg):
help="Total number of archives for the specified URL",
)
def add_getArg(getArg):
getArg.add_argument(
"--get",
@ -247,6 +255,7 @@ def add_getArg(getArg):
help="Prints the source code of the supplied url. Use '--get help' for extended usage",
)
def add_knownUrlArg(knownUrlArg):
knownUrlArg.add_argument(
"--known_urls", "-ku", action="store_true", help="URLs known for the domain."
@ -257,6 +266,12 @@ def add_knownUrlArg(knownUrlArg):
knownUrlArg.add_argument("--alive", "-a", action="store_true", help=help_text)
def add_nearArg(nearArg):
nearArg.add_argument(
"--near", "-N", action="store_true", help="Archive near specified time"
)
def add_nearArgs(nearArgs):
nearArgs.add_argument("--year", "-Y", type=int, help="Year in integer")
nearArgs.add_argument("--month", "-M", type=int, help="Month in integer")
@ -264,64 +279,35 @@ def add_nearArgs(nearArgs):
nearArgs.add_argument("--hour", "-H", type=int, help="Hour in intege")
nearArgs.add_argument("--minute", "-MIN", type=int, help="Minute in integer")
def parse_args(argv):
parser = argparse.ArgumentParser()
requiredArgs = parser.add_argument_group("URL argument (required)")
requiredArgs.add_argument(
"--url", "-u", help="URL on which Wayback machine operations would occur"
add_requiredArgs(parser.add_argument_group("URL argument (required)"))
add_userAgentArg(parser.add_argument_group("User Agent"))
add_saveArg(parser.add_argument_group("Create new archive/save URL"))
add_auArg(parser.add_argument_group("Get the latest Archive"))
add_jsonArg(parser.add_argument_group("Get the JSON data"))
add_oldestArg(parser.add_argument_group("Oldest archive"))
add_newestArg(parser.add_argument_group("Newest archive"))
add_totalArg(parser.add_argument_group("Total number of archives"))
add_getArg(parser.add_argument_group("Get source code"))
add_knownUrlArg(
parser.add_argument_group(
"URLs known and archived to Waybcak Machine for the site."
)
)
userAgentArg = parser.add_argument_group("User Agent")
add_userAgentArg(userAgentArg)
saveArg = parser.add_argument_group("Create new archive/save URL")
add_saveArg(saveArg)
auArg = parser.add_argument_group("Get the latest Archive")
add_auArg(auArg)
jsonArg = parser.add_argument_group("Get the JSON data")
add_jsonArg(jsonArg)
oldestArg = parser.add_argument_group("Oldest archive")
add_oldestArg(oldestArg)
newestArg = parser.add_argument_group("Newest archive")
add_newestArg(newestArg)
totalArg = parser.add_argument_group("Total number of archives")
add_totalArg(totalArg)
getArg = parser.add_argument_group("Get source code")
add_getArg(getArg)
knownUrlArg = parser.add_argument_group(
"URLs known and archived to Waybcak Machine for the site."
)
add_knownUrlArg(knownUrlArg)
nearArg = parser.add_argument_group("Archive close to time specified")
nearArg.add_argument(
"--near", "-N", action="store_true", help="Archive near specified time"
)
#The following is adding supplementary args used with near.
nearArgs = parser.add_argument_group("Arguments that are used only with --near")
add_nearArgs(nearArgs)
add_nearArg(parser.add_argument_group("Archive close to time specified"))
add_nearArgs(parser.add_argument_group("Arguments that are used only with --near"))
parser.add_argument(
"--version", "-v", action="store_true", help="Waybackpy version"
)
return parser.parse_args(argv[1:])
def main(argv=None):
if argv is None:
argv = sys.argv
args = parse_args(argv)
output = args_handler(args)
print(output)
print(args_handler(parse_args(argv)))
if __name__ == "__main__":

View File

@ -1,59 +1,62 @@
# -*- coding: utf-8 -*-
import re
from datetime import datetime, timedelta
from waybackpy.exceptions import WaybackError, URLError
from waybackpy.__version__ import __version__
import requests
import concurrent.futures
from datetime import datetime, timedelta
from waybackpy.__version__ import __version__
from waybackpy.exceptions import WaybackError, URLError
default_UA = "waybackpy python package - https://github.com/akamhy/waybackpy"
default_user_agent = "waybackpy python package - https://github.com/akamhy/waybackpy"
def _archive_url_parser(header):
def _archive_url_parser(header, url):
"""
The wayback machine's save API doesn't
return JSON response, we are required
to read the header of the API response
and look for the archive URL.
This method has some regexen (or regexes)
that search for archive url in header.
This method is used when you try to
save a webpage on wayback machine.
The wayback machine's save API doesn't
return JSON response, we are required
to read the header of the API response
and look for the archive URL.
Two cases are possible:
1) Either we find the archive url in
the header.
2) We didn't find the archive url in
2) Or we didn't find the archive url in
API header.
If we found the archive we return it.
If we found the archive URL we return it.
And if we couldn't find it we raise
WaybackError with a standard Error message.
And if we couldn't find it, we raise
WaybackError with an error message.
"""
# Regex1
arch = re.search(r"Content-Location: (/web/[0-9]{14}/.*)", str(header))
if arch:
return "web.archive.org" + arch.group(1)
m = re.search(r"Content-Location: (/web/[0-9]{14}/.*)", str(header))
if m:
return "web.archive.org" + m.group(1)
# Regex2
arch = re.search(
m = re.search(
r"rel=\"memento.*?(web\.archive\.org/web/[0-9]{14}/.*?)>", str(header)
)
if arch:
return arch.group(1)
if m:
return m.group(1)
# Regex3
arch = re.search(r"X-Cache-Key:\shttps(.*)[A-Z]{2}", str(header))
if arch:
return arch.group(1)
m = re.search(r"X-Cache-Key:\shttps(.*)[A-Z]{2}", str(header))
if m:
return m.group(1)
raise WaybackError(
"No archive URL found in the API response. "
"This version of waybackpy (%s) is likely out of date or WayBack Machine is malfunctioning. Visit "
"https://github.com/akamhy/waybackpy for the latest version "
"If '%s' can be accessed via your web browser then either "
"this version of waybackpy (%s) is out of date or WayBack Machine is malfunctioning. Visit "
"'https://github.com/akamhy/waybackpy' for the latest version "
"of waybackpy.\nHeader:\n%s" % (__version__, str(header))
)
@ -79,6 +82,7 @@ def _wayback_timestamp(**kwargs):
Return format is string.
"""
return "".join(
str(kwargs[key]).zfill(2) for key in ["year", "month", "day", "hour", "minute"]
)
@ -104,26 +108,25 @@ def _get_response(endpoint, params=None, headers=None):
"""
try:
response = requests.get(endpoint, params=params, headers=headers)
return requests.get(endpoint, params=params, headers=headers)
except Exception:
try:
response = requests.get(endpoint, params=params, headers=headers) # nosec
return requests.get(endpoint, params=params, headers=headers)
except Exception as e:
exc = WaybackError("Error while retrieving %s" % endpoint)
exc.__cause__ = e
raise exc
return response
class Url:
"""
waybackpy Url object <class 'waybackpy.wrapper.Url'>
waybackpy Url object, Type : <class 'waybackpy.wrapper.Url'>
"""
def __init__(self, url, user_agent=default_UA):
def __init__(self, url, user_agent=default_user_agent):
self.url = url
self.user_agent = user_agent
self._url_check() # checks url validity on init.
self._url_check()
self._archive_url = None
self.timestamp = None
self._JSON = None
@ -144,6 +147,7 @@ class Url:
sets self._archive_url, we now set self._archive_url to self.archive_url
and return it.
"""
if not self._archive_url:
self._archive_url = self.archive_url
return "%s" % self._archive_url
@ -159,8 +163,7 @@ class Url:
if self.timestamp == datetime.max:
return td_max.days
diff = datetime.utcnow() - self.timestamp
return diff.days
return (datetime.utcnow() - self.timestamp).days
def _url_check(self):
"""
@ -170,6 +173,7 @@ class Url:
If you known any others, please create a PR on the github repo.
"""
if "." not in self.url:
raise URLError("'%s' is not a vaild URL." % self.url)
@ -184,7 +188,7 @@ class Url:
endpoint = "https://archive.org/wayback/available"
headers = {"User-Agent": "%s" % self.user_agent}
payload = {"url": "%s" % self._clean_url()}
payload = {"url": "%s" % self._cleaned_url()}
response = _get_response(endpoint, params=payload, headers=headers)
return response.json()
@ -236,7 +240,7 @@ class Url:
self.timestamp = ts
return ts
def _clean_url(self):
def _cleaned_url(self):
"""
Remove newlines
replace " " with "_"
@ -245,10 +249,10 @@ class Url:
def save(self):
"""Create a new Wayback Machine archive for this URL."""
request_url = "https://web.archive.org/save/" + self._clean_url()
request_url = "https://web.archive.org/save/" + self._cleaned_url()
headers = {"User-Agent": "%s" % self.user_agent}
response = _get_response(request_url, params=None, headers=headers)
self._archive_url = "https://" + _archive_url_parser(response.headers)
self._archive_url = "https://" + _archive_url_parser(response.headers, self.url)
self.timestamp = datetime.utcnow()
return self
@ -258,7 +262,7 @@ class Url:
"""
if not url:
url = self._clean_url()
url = self._cleaned_url()
if not user_agent:
user_agent = self.user_agent
@ -307,14 +311,14 @@ class Url:
endpoint = "https://archive.org/wayback/available"
headers = {"User-Agent": "%s" % self.user_agent}
payload = {"url": "%s" % self._clean_url(), "timestamp": timestamp}
payload = {"url": "%s" % self._cleaned_url(), "timestamp": timestamp}
response = _get_response(endpoint, params=payload, headers=headers)
data = response.json()
if not data["archived_snapshots"]:
raise WaybackError(
"Can not find archive for '%s' try later or use wayback.Url(url, user_agent).save() "
"to create a new archive." % self._clean_url()
"to create a new archive." % self._cleaned_url()
)
archive_url = data["archived_snapshots"]["closest"]["url"]
archive_url = archive_url.replace(
@ -362,18 +366,24 @@ class Url:
Return type in integer.
"""
total_pages_url = (
"https://web.archive.org/cdx/search/cdx?url=%s&showNumPages=true"
% self._cleaned_url()
)
headers = {"User-Agent": "%s" % self.user_agent}
total_pages = int(
(_get_response(total_pages_url, headers=headers).text).strip()
)
endpoint = "https://web.archive.org/cdx/search/cdx"
headers = {
"User-Agent": "%s" % self.user_agent,
"output": "json",
"fl": "statuscode",
}
payload = {"url": "%s" % self._clean_url()}
response = _get_response(endpoint, params=payload, headers=headers)
# Most efficient method to count number of archives (yet)
return response.text.count(",")
archive_count = 0
for i in range(total_pages):
page_url = "https://web.archive.org/cdx/search/cdx?url=%s&page=%s" % (
self._cleaned_url(),
str(i),
)
count = str(_get_response(page_url, headers=headers).text).count("\n")
archive_count = archive_count + count
return archive_count
def live_urls_picker(self, url):
"""
@ -384,7 +394,7 @@ class Url:
try:
response_code = requests.get(url).status_code
except Exception:
return # we don't care if Exception
return # we don't care if Exception
# 200s are OK and 300s are usually redirects, if you don't want redirects replace 400 with 300
if response_code >= 400:
@ -406,12 +416,12 @@ class Url:
if subdomain:
request_url = (
"https://web.archive.org/cdx/search/cdx?url=*.%s/*&output=json&fl=original&collapse=urlkey"
% self._clean_url()
% self._cleaned_url()
)
else:
request_url = (
"http://web.archive.org/cdx/search/cdx?url=%s/*&output=json&fl=original&collapse=urlkey"
% self._clean_url()
% self._cleaned_url()
)
headers = {"User-Agent": "%s" % self.user_agent}