Use Black for quick readability improvements
This commit is contained in:
parent
3ac7c7ab86
commit
fd74e62ff9
@ -9,19 +9,19 @@ from waybackpy.exceptions import WaybackError
|
|||||||
if sys.version_info >= (3, 0): # If the python ver >= 3
|
if sys.version_info >= (3, 0): # If the python ver >= 3
|
||||||
from urllib.request import Request, urlopen
|
from urllib.request import Request, urlopen
|
||||||
from urllib.error import URLError
|
from urllib.error import URLError
|
||||||
else: # For python2.x
|
else: # For python2.x
|
||||||
from urllib2 import Request, urlopen, URLError
|
from urllib2 import Request, urlopen, URLError
|
||||||
|
|
||||||
default_UA = "waybackpy python package - https://github.com/akamhy/waybackpy"
|
default_UA = "waybackpy python package - https://github.com/akamhy/waybackpy"
|
||||||
|
|
||||||
class Url():
|
|
||||||
"""waybackpy Url object"""
|
|
||||||
|
|
||||||
|
class Url:
|
||||||
|
"""waybackpy Url object"""
|
||||||
|
|
||||||
def __init__(self, url, user_agent=default_UA):
|
def __init__(self, url, user_agent=default_UA):
|
||||||
self.url = url
|
self.url = url
|
||||||
self.user_agent = user_agent
|
self.user_agent = user_agent
|
||||||
self.url_check() # checks url validity on init.
|
self.url_check() # checks url validity on init.
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
"""Representation of the object."""
|
"""Representation of the object."""
|
||||||
@ -43,41 +43,40 @@ class Url():
|
|||||||
|
|
||||||
def clean_url(self):
|
def clean_url(self):
|
||||||
"""Fix the URL, if possible."""
|
"""Fix the URL, if possible."""
|
||||||
return str(self.url).strip().replace(" ","_")
|
return str(self.url).strip().replace(" ", "_")
|
||||||
|
|
||||||
def wayback_timestamp(self, **kwargs):
|
def wayback_timestamp(self, **kwargs):
|
||||||
"""Return the formatted the timestamp."""
|
"""Return the formatted the timestamp."""
|
||||||
return (
|
return (
|
||||||
str(kwargs["year"])
|
str(kwargs["year"])
|
||||||
+
|
+ str(kwargs["month"]).zfill(2)
|
||||||
str(kwargs["month"]).zfill(2)
|
+ str(kwargs["day"]).zfill(2)
|
||||||
+
|
+ str(kwargs["hour"]).zfill(2)
|
||||||
str(kwargs["day"]).zfill(2)
|
+ str(kwargs["minute"]).zfill(2)
|
||||||
+
|
)
|
||||||
str(kwargs["hour"]).zfill(2)
|
|
||||||
+
|
|
||||||
str(kwargs["minute"]).zfill(2)
|
|
||||||
)
|
|
||||||
|
|
||||||
def save(self):
|
def save(self):
|
||||||
"""Create a new archives for an URL on the Wayback Machine."""
|
"""Create a new archives for an URL on the Wayback Machine."""
|
||||||
request_url = ("https://web.archive.org/save/" + self.clean_url())
|
request_url = "https://web.archive.org/save/" + self.clean_url()
|
||||||
hdr = { 'User-Agent' : '%s' % self.user_agent } #nosec
|
hdr = {"User-Agent": "%s" % self.user_agent} # nosec
|
||||||
req = Request(request_url, headers=hdr) #nosec
|
req = Request(request_url, headers=hdr) # nosec
|
||||||
header = self.get_response(req).headers
|
header = self.get_response(req).headers
|
||||||
|
|
||||||
def archive_url_parser(header):
|
def archive_url_parser(header):
|
||||||
"""Parse out the archive from header."""
|
"""Parse out the archive from header."""
|
||||||
#Regex1
|
# Regex1
|
||||||
arch = re.search(r"rel=\"memento.*?(web\.archive\.org/web/[0-9]{14}/.*?)>", str(header))
|
arch = re.search(
|
||||||
|
r"rel=\"memento.*?(web\.archive\.org/web/[0-9]{14}/.*?)>", str(header)
|
||||||
|
)
|
||||||
if arch:
|
if arch:
|
||||||
return arch.group(1)
|
return arch.group(1)
|
||||||
#Regex2
|
# Regex2
|
||||||
arch = re.search(r"X-Cache-Key:\shttps(.*)[A-Z]{2}", str(header))
|
arch = re.search(r"X-Cache-Key:\shttps(.*)[A-Z]{2}", str(header))
|
||||||
if arch:
|
if arch:
|
||||||
return arch.group(1)
|
return arch.group(1)
|
||||||
raise WaybackError(
|
raise WaybackError(
|
||||||
"No archive url found in the API response. Visit https://github.com/akamhy/waybackpy for latest version of waybackpy.\nHeader:\n%s" % str(header)
|
"No archive url found in the API response. Visit https://github.com/akamhy/waybackpy for latest version of waybackpy.\nHeader:\n%s"
|
||||||
|
% str(header)
|
||||||
)
|
)
|
||||||
|
|
||||||
return "https://" + archive_url_parser(header)
|
return "https://" + archive_url_parser(header)
|
||||||
@ -90,12 +89,12 @@ class Url():
|
|||||||
if not user_agent:
|
if not user_agent:
|
||||||
user_agent = self.user_agent
|
user_agent = self.user_agent
|
||||||
|
|
||||||
hdr = { 'User-Agent' : '%s' % user_agent }
|
hdr = {"User-Agent": "%s" % user_agent}
|
||||||
req = Request(url, headers=hdr) #nosec
|
req = Request(url, headers=hdr) # nosec
|
||||||
response = self.get_response(req)
|
response = self.get_response(req)
|
||||||
if not encoding:
|
if not encoding:
|
||||||
try:
|
try:
|
||||||
encoding= response.headers['content-type'].split('charset=')[-1]
|
encoding = response.headers["content-type"].split("charset=")[-1]
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
encoding = "UTF-8"
|
encoding = "UTF-8"
|
||||||
return response.read().decode(encoding.replace("text/html", "UTF-8", 1))
|
return response.read().decode(encoding.replace("text/html", "UTF-8", 1))
|
||||||
@ -103,10 +102,10 @@ class Url():
|
|||||||
def get_response(self, req):
|
def get_response(self, req):
|
||||||
"""Get response for the supplied request."""
|
"""Get response for the supplied request."""
|
||||||
try:
|
try:
|
||||||
response = urlopen(req) #nosec
|
response = urlopen(req) # nosec
|
||||||
except Exception:
|
except Exception:
|
||||||
try:
|
try:
|
||||||
response = urlopen(req) #nosec
|
response = urlopen(req) # nosec
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise WaybackError(e)
|
raise WaybackError(e)
|
||||||
return response
|
return response
|
||||||
@ -116,22 +115,32 @@ class Url():
|
|||||||
Supported params are year, month, day, hour and minute.
|
Supported params are year, month, day, hour and minute.
|
||||||
The non supplied parameters are default to the runtime time.
|
The non supplied parameters are default to the runtime time.
|
||||||
"""
|
"""
|
||||||
year=kwargs.get("year", datetime.utcnow().strftime('%Y'))
|
year = kwargs.get("year", datetime.utcnow().strftime("%Y"))
|
||||||
month=kwargs.get("month", datetime.utcnow().strftime('%m'))
|
month = kwargs.get("month", datetime.utcnow().strftime("%m"))
|
||||||
day=kwargs.get("day", datetime.utcnow().strftime('%d'))
|
day = kwargs.get("day", datetime.utcnow().strftime("%d"))
|
||||||
hour=kwargs.get("hour", datetime.utcnow().strftime('%H'))
|
hour = kwargs.get("hour", datetime.utcnow().strftime("%H"))
|
||||||
minute=kwargs.get("minute", datetime.utcnow().strftime('%M'))
|
minute = kwargs.get("minute", datetime.utcnow().strftime("%M"))
|
||||||
timestamp = self.wayback_timestamp(year=year,month=month,day=day,hour=hour,minute=minute)
|
timestamp = self.wayback_timestamp(
|
||||||
request_url = "https://archive.org/wayback/available?url=%s×tamp=%s" % (self.clean_url(), str(timestamp))
|
year=year, month=month, day=day, hour=hour, minute=minute
|
||||||
hdr = { 'User-Agent' : '%s' % self.user_agent }
|
)
|
||||||
req = Request(request_url, headers=hdr) # nosec
|
request_url = "https://archive.org/wayback/available?url=%s×tamp=%s" % (
|
||||||
|
self.clean_url(),
|
||||||
|
str(timestamp),
|
||||||
|
)
|
||||||
|
hdr = {"User-Agent": "%s" % self.user_agent}
|
||||||
|
req = Request(request_url, headers=hdr) # nosec
|
||||||
response = self.get_response(req)
|
response = self.get_response(req)
|
||||||
data = json.loads(response.read().decode("UTF-8"))
|
data = json.loads(response.read().decode("UTF-8"))
|
||||||
if not data["archived_snapshots"]:
|
if not data["archived_snapshots"]:
|
||||||
raise WaybackError("'%s' is not yet archived. Use wayback.Url(url, user_agent).save() to create a new archive." % self.clean_url())
|
raise WaybackError(
|
||||||
archive_url = (data["archived_snapshots"]["closest"]["url"])
|
"'%s' is not yet archived. Use wayback.Url(url, user_agent).save() to create a new archive."
|
||||||
|
% self.clean_url()
|
||||||
|
)
|
||||||
|
archive_url = data["archived_snapshots"]["closest"]["url"]
|
||||||
# wayback machine returns http sometimes, idk why? But they support https
|
# wayback machine returns http sometimes, idk why? But they support https
|
||||||
archive_url = archive_url.replace("http://web.archive.org/web/","https://web.archive.org/web/",1)
|
archive_url = archive_url.replace(
|
||||||
|
"http://web.archive.org/web/", "https://web.archive.org/web/", 1
|
||||||
|
)
|
||||||
return archive_url
|
return archive_url
|
||||||
|
|
||||||
def oldest(self, year=1994):
|
def oldest(self, year=1994):
|
||||||
@ -144,8 +153,13 @@ class Url():
|
|||||||
|
|
||||||
def total_archives(self):
|
def total_archives(self):
|
||||||
"""Returns the total number of archives on Wayback Machine for an URL."""
|
"""Returns the total number of archives on Wayback Machine for an URL."""
|
||||||
hdr = { 'User-Agent' : '%s' % self.user_agent }
|
hdr = {"User-Agent": "%s" % self.user_agent}
|
||||||
request_url = "https://web.archive.org/cdx/search/cdx?url=%s&output=json&fl=statuscode" % self.clean_url()
|
request_url = (
|
||||||
req = Request(request_url, headers=hdr) # nosec
|
"https://web.archive.org/cdx/search/cdx?url=%s&output=json&fl=statuscode"
|
||||||
|
% self.clean_url()
|
||||||
|
)
|
||||||
|
req = Request(request_url, headers=hdr) # nosec
|
||||||
response = self.get_response(req)
|
response = self.get_response(req)
|
||||||
return str(response.read()).count(",") # Most efficient method to count number of archives (yet)
|
return str(response.read()).count(
|
||||||
|
","
|
||||||
|
) # Most efficient method to count number of archives (yet)
|
||||||
|
Loading…
Reference in New Issue
Block a user