Update wrapper.py

This commit is contained in:
Akash
2020-07-17 20:31:35 +05:30
committed by GitHub
parent 9ac1e877c8
commit 0e64fe3b39

View File

@@ -1,149 +1,159 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import sys import sys
import json import json, re
from datetime import datetime from datetime import datetime
from waybackpy.exceptions import TooManyArchivingRequests, ArchivingNotAllowed, PageNotSaved, ArchiveNotFound, UrlNotFound, BadGateWay, InvalidUrl, WaybackUnavailable from waybackpy.exceptions import WaybackError
version = (3, 0) version = (3, 0)
cur_version = sys.version_info python_version = sys.version_info
if cur_version >= version: # If the python ver >= 3 if python_version >= version: # If the python ver >= 3
from urllib.request import Request, urlopen from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError from urllib.error import HTTPError, URLError
else: # For python2.x else: # For python2.x
from urllib2 import Request, urlopen, HTTPError, URLError from urllib2 import Request, urlopen, HTTPError, URLError
default_UA = "waybackpy python package ; ( https://github.com/akamhy/waybackpy )" default_UA = "waybackpy python package - https://github.com/akamhy/waybackpy"
def url_check(url): class Url():
if "." not in url:
raise InvalidUrl("'%s' is not a vaild url." % url)
def clean_url(url): def __init__(self, url, user_agent=default_UA):
return str(url).strip().replace(" ","_") self.url = url
self.user_agent = user_agent
def wayback_timestamp(**kwargs): self.url_check() # checks url validity on init.
return (
str(kwargs["year"])
+
str(kwargs["month"]).zfill(2)
+
str(kwargs["day"]).zfill(2)
+
str(kwargs["hour"]).zfill(2)
+
str(kwargs["minute"]).zfill(2)
)
def handle_HTTPError(e): def __repr__(self):
if e.code == 502: return "waybackpy.Url(url=%s, user_agent=%s)" % (self.url, self.user_agent)
raise BadGateWay(e)
elif e.code == 503:
raise WaybackUnavailable(e)
elif e.code == 429:
raise TooManyArchivingRequests(e)
elif e.code == 404:
raise UrlNotFound(e)
def save(url, UA=default_UA): def __str__(self):
url_check(url) return "%s" % self.clean_url()
request_url = ("https://web.archive.org/save/" + clean_url(url))
hdr = { 'User-Agent' : '%s' % UA } #nosec def url_check(self):
req = Request(request_url, headers=hdr) #nosec if "." not in self.url:
raise URLError("'%s' is not a vaild url." % self.url)
return True
def clean_url(self):
return str(self.url).strip().replace(" ","_")
def wayback_timestamp(self, **kwargs):
return (
str(kwargs["year"])
+
str(kwargs["month"]).zfill(2)
+
str(kwargs["day"]).zfill(2)
+
str(kwargs["hour"]).zfill(2)
+
str(kwargs["minute"]).zfill(2)
)
def handle_HTTPError(self, e):
if e.code >= 500:
raise WaybackError(e) from None
elif e.code == 429:
raise WaybackError(e) from None
elif e.code == 404:
raise HTTPError(e) from None
def save(self):
request_url = ("https://web.archive.org/save/" + self.clean_url())
hdr = { 'User-Agent' : '%s' % self.user_agent } #nosec
req = Request(request_url, headers=hdr) #nosec
try:
response = urlopen(req) #nosec
except HTTPError as e:
if handle_HTTPError(e) is None:
raise PageNotSaved(e)
except URLError:
try: try:
response = urlopen(req) #nosec response = urlopen(req) #nosec
except URLError as e: except HTTPError as e:
raise UrlNotFound(e) if self.handle_HTTPError(e) is None:
raise WaybackError(e)
except URLError:
try:
response = urlopen(req) #nosec
except URLError as e:
raise HTTPError(e)
header = response.headers header = response.headers
if "exclusion.robots.policy" in str(header): try:
raise ArchivingNotAllowed("Can not archive %s. Disabled by site owner." % (url)) arch = re.search(r"rel=\"memento.*?web\.archive\.org(/web/[0-9]{14}/.*?)>", str(header)).group(1)
except KeyError as e:
raise WaybackError(e)
return "https://web.archive.org" + header['Content-Location'] return "https://web.archive.org" + arch
def get(url, encoding=None, UA=default_UA): def get(self, url=None, user_agent=None, encoding=None):
url_check(url)
hdr = { 'User-Agent' : '%s' % UA } if not url:
req = Request(clean_url(url), headers=hdr) #nosec url = self.clean_url()
if not user_agent:
user_agent = self.user_agent
hdr = { 'User-Agent' : '%s' % user_agent }
req = Request(url, headers=hdr) #nosec
try:
resp=urlopen(req) #nosec
except URLError:
try: try:
resp=urlopen(req) #nosec resp=urlopen(req) #nosec
except URLError as e: except URLError:
raise UrlNotFound(e) try:
resp=urlopen(req) #nosec
except URLError as e:
raise HTTPError(e)
if not encoding:
try:
encoding= resp.headers['content-type'].split('charset=')[-1]
except AttributeError:
encoding = "UTF-8"
return resp.read().decode(encoding.replace("text/html", "UTF-8", 1))
def near(self, **kwargs):
year=kwargs.get("year", datetime.utcnow().strftime('%Y'))
month=kwargs.get("month", datetime.utcnow().strftime('%m'))
day=kwargs.get("day", datetime.utcnow().strftime('%d'))
hour=kwargs.get("hour", datetime.utcnow().strftime('%H'))
minute=kwargs.get("minute", datetime.utcnow().strftime('%M'))
timestamp = self.wayback_timestamp(year=year,month=month,day=day,hour=hour,minute=minute)
request_url = "https://archive.org/wayback/available?url=%s&timestamp=%s" % (self.clean_url(), str(timestamp))
hdr = { 'User-Agent' : '%s' % self.user_agent }
req = Request(request_url, headers=hdr) # nosec
if encoding is None:
try: try:
encoding= resp.headers['content-type'].split('charset=')[-1] response = urlopen(req) #nosec
except AttributeError: except HTTPError as e:
encoding = "UTF-8" self.handle_HTTPError(e)
return resp.read().decode(encoding.replace("text/html", "UTF-8", 1)) data = json.loads(response.read().decode("UTF-8"))
if not data["archived_snapshots"]:
raise WaybackError("'%s' is not yet archived." % url)
def near(url, **kwargs): archive_url = (data["archived_snapshots"]["closest"]["url"])
# wayback machine returns http sometimes, idk why? But they support https
archive_url = archive_url.replace("http://web.archive.org/web/","https://web.archive.org/web/",1)
return archive_url
try: def oldest(self, year=1994):
url = kwargs["url"] return self.near(year=year)
except KeyError:
url = url
year=kwargs.get("year", datetime.utcnow().strftime('%Y')) def newest(self):
month=kwargs.get("month", datetime.utcnow().strftime('%m')) return self.near()
day=kwargs.get("day", datetime.utcnow().strftime('%d'))
hour=kwargs.get("hour", datetime.utcnow().strftime('%H'))
minute=kwargs.get("minute", datetime.utcnow().strftime('%M'))
UA=kwargs.get("UA", default_UA)
url_check(url) def total_archives(self):
timestamp = wayback_timestamp(year=year,month=month,day=day,hour=hour,minute=minute) hdr = { 'User-Agent' : '%s' % self.user_agent }
request_url = "https://archive.org/wayback/available?url=%s&timestamp=%s" % (clean_url(url), str(timestamp)) request_url = "https://web.archive.org/cdx/search/cdx?url=%s&output=json&fl=statuscode" % self.clean_url()
hdr = { 'User-Agent' : '%s' % UA } req = Request(request_url, headers=hdr) # nosec
req = Request(request_url, headers=hdr) # nosec
try: try:
response = urlopen(req) #nosec response = urlopen(req) #nosec
except HTTPError as e: except HTTPError as e:
handle_HTTPError(e) self.handle_HTTPError(e)
data = json.loads(response.read().decode("UTF-8")) return str(response.read()).count(",") # Most efficient method to count number of archives (yet)
if not data["archived_snapshots"]:
raise ArchiveNotFound("'%s' is not yet archived." % url)
archive_url = (data["archived_snapshots"]["closest"]["url"])
# wayback machine returns http sometimes, idk why? But they support https
archive_url = archive_url.replace("http://web.archive.org/web/","https://web.archive.org/web/",1)
return archive_url
def oldest(url, UA=default_UA, year=1994):
return near(url, year=year, UA=UA)
def newest(url, UA=default_UA):
return near(url, UA=UA)
def total_archives(url, UA=default_UA):
url_check(url)
hdr = { 'User-Agent' : '%s' % UA }
request_url = "https://web.archive.org/cdx/search/cdx?url=%s&output=json&fl=statuscode" % clean_url(url)
req = Request(request_url, headers=hdr) # nosec
try:
response = urlopen(req) #nosec
except HTTPError as e:
handle_HTTPError(e)
return str(response.read()).count(",") # Most efficient method to count (yet)