docstrings

This commit is contained in:
Akash
2020-07-18 15:50:51 +05:30
committed by GitHub
parent e19ca42753
commit a7f6d628cc

View File

@@ -6,11 +6,7 @@ import json
from datetime import datetime from datetime import datetime
from waybackpy.exceptions import WaybackError from waybackpy.exceptions import WaybackError
version = (3, 0) if sys.version_info >= (3, 0): # If the python ver >= 3
python_version = sys.version_info
if python_version >= version: # If the python ver >= 3
from urllib.request import Request, urlopen from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError from urllib.error import HTTPError, URLError
else: # For python2.x else: # For python2.x
@@ -19,29 +15,38 @@ else: # For python2.x
default_UA = "waybackpy python package - https://github.com/akamhy/waybackpy" default_UA = "waybackpy python package - https://github.com/akamhy/waybackpy"
class Url(): class Url():
"""waybackpy Url object"""
def __init__(self, url, user_agent=default_UA): def __init__(self, url, user_agent=default_UA):
self.url = url self.url = url
self.user_agent = user_agent self.user_agent = user_agent
self.url_check() # checks url validity on init. self.url_check() # checks url validity on init.
def __repr__(self): def __repr__(self):
"""Representation of the object."""
return "waybackpy.Url(url=%s, user_agent=%s)" % (self.url, self.user_agent) return "waybackpy.Url(url=%s, user_agent=%s)" % (self.url, self.user_agent)
def __str__(self): def __str__(self):
"""String representation of the object."""
return "%s" % self.clean_url() return "%s" % self.clean_url()
def __len__(self):
"""Length of the URL."""
return len(self.clean_url())
def url_check(self): def url_check(self):
"""Check for common URL problems."""
if "." not in self.url: if "." not in self.url:
raise URLError("'%s' is not a vaild url." % self.url) raise URLError("'%s' is not a vaild url." % self.url)
return True return True
def clean_url(self): def clean_url(self):
"""Fix the URL, if possible."""
return str(self.url).strip().replace(" ","_") return str(self.url).strip().replace(" ","_")
def wayback_timestamp(self, **kwargs): def wayback_timestamp(self, **kwargs):
"""Return the formatted the timestamp."""
return ( return (
str(kwargs["year"]) str(kwargs["year"])
+ +
@@ -55,50 +60,39 @@ class Url():
) )
def handle_HTTPError(self, e): def handle_HTTPError(self, e):
if e.code >= 500: """Handle some common HTTPErrors."""
raise WaybackError(e)
if e.code == 429:
raise WaybackError(e)
if e.code == 404: if e.code == 404:
raise HTTPError(e) raise HTTPError(e)
if e.code >= 400:
raise WaybackError(e)
def save(self): def save(self):
"""Create a new archives for an URL on the Wayback Machine."""
request_url = ("https://web.archive.org/save/" + self.clean_url()) request_url = ("https://web.archive.org/save/" + self.clean_url())
hdr = { 'User-Agent' : '%s' % self.user_agent } #nosec hdr = { 'User-Agent' : '%s' % self.user_agent } #nosec
req = Request(request_url, headers=hdr) #nosec req = Request(request_url, headers=hdr) #nosec
try: try:
response = urlopen(req) #nosec response = urlopen(req, timeout=30) #nosec
except HTTPError as e: except Exception:
if self.handle_HTTPError(e) is None: try:
response = urlopen(req, timeout=300) #nosec
except Exception as e:
raise WaybackError(e) raise WaybackError(e)
except URLError:
try:
response = urlopen(req) #nosec
except URLError as e:
raise HTTPError(e)
header = response.headers header = response.headers
try: try:
arch = re.search(r"rel=\"memento.*?web\.archive\.org(/web/[0-9]{14}/.*?)>", str(header)).group(1) arch = re.search(r"rel=\"memento.*?web\.archive\.org(/web/[0-9]{14}/.*?)>", str(header)).group(1)
except KeyError as e: except KeyError as e:
raise WaybackError(e) raise WaybackError(e)
return "https://web.archive.org" + arch return "https://web.archive.org" + arch
def get(self, url=None, user_agent=None, encoding=None): def get(self, url=None, user_agent=None, encoding=None):
"""Returns the source code of the supplied URL. Auto detects the encoding if not supplied."""
if not url: if not url:
url = self.clean_url() url = self.clean_url()
if not user_agent: if not user_agent:
user_agent = self.user_agent user_agent = self.user_agent
hdr = { 'User-Agent' : '%s' % user_agent } hdr = { 'User-Agent' : '%s' % user_agent }
req = Request(url, headers=hdr) #nosec req = Request(url, headers=hdr) #nosec
try: try:
resp=urlopen(req) #nosec resp=urlopen(req) #nosec
except URLError: except URLError:
@@ -106,55 +100,54 @@ class Url():
resp=urlopen(req) #nosec resp=urlopen(req) #nosec
except URLError as e: except URLError as e:
raise HTTPError(e) raise HTTPError(e)
if not encoding: if not encoding:
try: try:
encoding= resp.headers['content-type'].split('charset=')[-1] encoding= resp.headers['content-type'].split('charset=')[-1]
except AttributeError: except AttributeError:
encoding = "UTF-8" encoding = "UTF-8"
return resp.read().decode(encoding.replace("text/html", "UTF-8", 1)) return resp.read().decode(encoding.replace("text/html", "UTF-8", 1))
def near(self, **kwargs): def near(self, **kwargs):
""" Returns the archived from Wayback Machine for an URL closest to the time supplied.
Supported params are year, month, day, hour and minute.
The non supplied parameters are default to the runtime time.
"""
year=kwargs.get("year", datetime.utcnow().strftime('%Y')) year=kwargs.get("year", datetime.utcnow().strftime('%Y'))
month=kwargs.get("month", datetime.utcnow().strftime('%m')) month=kwargs.get("month", datetime.utcnow().strftime('%m'))
day=kwargs.get("day", datetime.utcnow().strftime('%d')) day=kwargs.get("day", datetime.utcnow().strftime('%d'))
hour=kwargs.get("hour", datetime.utcnow().strftime('%H')) hour=kwargs.get("hour", datetime.utcnow().strftime('%H'))
minute=kwargs.get("minute", datetime.utcnow().strftime('%M')) minute=kwargs.get("minute", datetime.utcnow().strftime('%M'))
timestamp = self.wayback_timestamp(year=year,month=month,day=day,hour=hour,minute=minute) timestamp = self.wayback_timestamp(year=year,month=month,day=day,hour=hour,minute=minute)
request_url = "https://archive.org/wayback/available?url=%s&timestamp=%s" % (self.clean_url(), str(timestamp)) request_url = "https://archive.org/wayback/available?url=%s&timestamp=%s" % (self.clean_url(), str(timestamp))
hdr = { 'User-Agent' : '%s' % self.user_agent } hdr = { 'User-Agent' : '%s' % self.user_agent }
req = Request(request_url, headers=hdr) # nosec req = Request(request_url, headers=hdr) # nosec
try: try:
response = urlopen(req) #nosec response = urlopen(req) #nosec
except HTTPError as e: except Exception as e:
self.handle_HTTPError(e) self.handle_HTTPError(e)
data = json.loads(response.read().decode("UTF-8")) data = json.loads(response.read().decode("UTF-8"))
if not data["archived_snapshots"]: if not data["archived_snapshots"]:
raise WaybackError("'%s' is not yet archived." % url) raise WaybackError("'%s' is not yet archived." % url)
archive_url = (data["archived_snapshots"]["closest"]["url"]) archive_url = (data["archived_snapshots"]["closest"]["url"])
# wayback machine returns http sometimes, idk why? But they support https # wayback machine returns http sometimes, idk why? But they support https
archive_url = archive_url.replace("http://web.archive.org/web/","https://web.archive.org/web/",1) archive_url = archive_url.replace("http://web.archive.org/web/","https://web.archive.org/web/",1)
return archive_url return archive_url
def oldest(self, year=1994): def oldest(self, year=1994):
"""Returns the oldest archive from Wayback Machine for an URL."""
return self.near(year=year) return self.near(year=year)
def newest(self): def newest(self):
"""Returns the newest archive on Wayback Machine for an URL, sometimes you may not get the newest archive because wayback machine DB lag."""
return self.near() return self.near()
def total_archives(self): def total_archives(self):
"""Returns the total number of archives on Wayback Machine for an URL."""
hdr = { 'User-Agent' : '%s' % self.user_agent } hdr = { 'User-Agent' : '%s' % self.user_agent }
request_url = "https://web.archive.org/cdx/search/cdx?url=%s&output=json&fl=statuscode" % self.clean_url() request_url = "https://web.archive.org/cdx/search/cdx?url=%s&output=json&fl=statuscode" % self.clean_url()
req = Request(request_url, headers=hdr) # nosec req = Request(request_url, headers=hdr) # nosec
try: try:
response = urlopen(req) #nosec response = urlopen(req) #nosec
except HTTPError as e: except Exception as e:
self.handle_HTTPError(e) self.handle_HTTPError(e)
return str(response.read()).count(",") # Most efficient method to count number of archives (yet) return str(response.read()).count(",") # Most efficient method to count number of archives (yet)