move wayback_timestamp out of class, mark private functions
This commit is contained in:
parent
bec26c4bae
commit
10e918eebf
@ -16,7 +16,7 @@ else: # For python2.x
|
||||
default_UA = "waybackpy python package - https://github.com/akamhy/waybackpy"
|
||||
|
||||
|
||||
def archive_url_parser(header):
|
||||
def _archive_url_parser(header):
|
||||
"""Parse out the archive from header."""
|
||||
# Regex1
|
||||
arch = re.search(
|
||||
@ -36,34 +36,7 @@ def archive_url_parser(header):
|
||||
)
|
||||
|
||||
|
||||
class Url:
|
||||
"""waybackpy Url object"""
|
||||
|
||||
def __init__(self, url, user_agent=default_UA):
|
||||
self.url = url
|
||||
self.user_agent = user_agent
|
||||
self.url_check() # checks url validity on init.
|
||||
|
||||
def __repr__(self):
|
||||
return "waybackpy.Url(url=%s, user_agent=%s)" % (self.url, self.user_agent)
|
||||
|
||||
def __str__(self):
|
||||
return "%s" % self.clean_url()
|
||||
|
||||
def __len__(self):
|
||||
return len(self.clean_url())
|
||||
|
||||
def url_check(self):
|
||||
"""Check for common URL problems."""
|
||||
if "." not in self.url:
|
||||
raise URLError("'%s' is not a vaild URL." % self.url)
|
||||
return True
|
||||
|
||||
def clean_url(self):
|
||||
"""Fix the URL, if possible."""
|
||||
return str(self.url).strip().replace(" ", "_")
|
||||
|
||||
def wayback_timestamp(self, **kwargs):
|
||||
def _wayback_timestamp(**kwargs):
|
||||
"""Return a formatted timestamp."""
|
||||
return (
|
||||
str(kwargs["year"])
|
||||
@ -73,27 +46,54 @@ class Url:
|
||||
+ str(kwargs["minute"]).zfill(2)
|
||||
)
|
||||
|
||||
|
||||
class Url:
|
||||
"""waybackpy Url object"""
|
||||
|
||||
def __init__(self, url, user_agent=default_UA):
|
||||
self.url = url
|
||||
self.user_agent = user_agent
|
||||
self._url_check() # checks url validity on init.
|
||||
|
||||
def __repr__(self):
|
||||
return "waybackpy.Url(url=%s, user_agent=%s)" % (self.url, self.user_agent)
|
||||
|
||||
def __str__(self):
|
||||
return "%s" % self._clean_url()
|
||||
|
||||
def __len__(self):
|
||||
return len(self._clean_url())
|
||||
|
||||
def _url_check(self):
|
||||
"""Check for common URL problems."""
|
||||
if "." not in self.url:
|
||||
raise URLError("'%s' is not a vaild URL." % self.url)
|
||||
return True
|
||||
|
||||
def _clean_url(self):
|
||||
"""Fix the URL, if possible."""
|
||||
return str(self.url).strip().replace(" ", "_")
|
||||
|
||||
def save(self):
|
||||
"""Create a new Wayback Machine archive for this URL."""
|
||||
request_url = "https://web.archive.org/save/" + self.clean_url()
|
||||
request_url = "https://web.archive.org/save/" + self._clean_url()
|
||||
hdr = {"User-Agent": "%s" % self.user_agent} # nosec
|
||||
req = Request(request_url, headers=hdr) # nosec
|
||||
header = self.get_response(req).headers
|
||||
return "https://" + archive_url_parser(header)
|
||||
header = self._get_response(req).headers
|
||||
return "https://" + _archive_url_parser(header)
|
||||
|
||||
def get(self, url=None, user_agent=None, encoding=None):
|
||||
def _get(self, url=None, user_agent=None, encoding=None):
|
||||
"""Return the source code of the supplied URL.
|
||||
If encoding is not supplied, it is auto-detected from the response.
|
||||
"""
|
||||
|
||||
if not url:
|
||||
url = self.clean_url()
|
||||
url = self._clean_url()
|
||||
if not user_agent:
|
||||
user_agent = self.user_agent
|
||||
|
||||
hdr = {"User-Agent": "%s" % user_agent}
|
||||
req = Request(url, headers=hdr) # nosec
|
||||
response = self.get_response(req)
|
||||
response = self._get_response(req)
|
||||
if not encoding:
|
||||
try:
|
||||
encoding = response.headers["content-type"].split("charset=")[-1]
|
||||
@ -101,7 +101,7 @@ class Url:
|
||||
encoding = "UTF-8"
|
||||
return response.read().decode(encoding.replace("text/html", "UTF-8", 1))
|
||||
|
||||
def get_response(self, req):
|
||||
def _get_response(self, req):
|
||||
"""Get response for the supplied request."""
|
||||
try:
|
||||
response = urlopen(req) # nosec
|
||||
@ -123,21 +123,21 @@ class Url:
|
||||
day = kwargs.get("day", datetime.utcnow().strftime("%d"))
|
||||
hour = kwargs.get("hour", datetime.utcnow().strftime("%H"))
|
||||
minute = kwargs.get("minute", datetime.utcnow().strftime("%M"))
|
||||
timestamp = self.wayback_timestamp(
|
||||
timestamp = _wayback_timestamp(
|
||||
year=year, month=month, day=day, hour=hour, minute=minute
|
||||
)
|
||||
request_url = "https://archive.org/wayback/available?url=%s×tamp=%s" % (
|
||||
self.clean_url(),
|
||||
self._clean_url(),
|
||||
str(timestamp),
|
||||
)
|
||||
hdr = {"User-Agent": "%s" % self.user_agent}
|
||||
req = Request(request_url, headers=hdr) # nosec
|
||||
response = self.get_response(req)
|
||||
response = self._get_response(req)
|
||||
data = json.loads(response.read().decode("UTF-8"))
|
||||
if not data["archived_snapshots"]:
|
||||
raise WaybackError(
|
||||
"'%s' is not yet archived. Use wayback.Url(url, user_agent).save() "
|
||||
"to create a new archive." % self.clean_url()
|
||||
"to create a new archive." % self._clean_url()
|
||||
)
|
||||
archive_url = data["archived_snapshots"]["closest"]["url"]
|
||||
# wayback machine returns http sometimes, idk why? But they support https
|
||||
@ -163,9 +163,9 @@ class Url:
|
||||
hdr = {"User-Agent": "%s" % self.user_agent}
|
||||
request_url = (
|
||||
"https://web.archive.org/cdx/search/cdx?url=%s&output=json&fl=statuscode"
|
||||
% self.clean_url()
|
||||
% self._clean_url()
|
||||
)
|
||||
req = Request(request_url, headers=hdr) # nosec
|
||||
response = self.get_response(req)
|
||||
response = self._get_response(req)
|
||||
# Most efficient method to count number of archives (yet)
|
||||
return str(response.read()).count(",")
|
||||
|
Loading…
Reference in New Issue
Block a user