now using requests lib as it handles errors nicely (#42)

* now using requests lib as it handles errors nicely

* remove unused import (urllib)

* FIX : replaced full_url with endpoint (not using urlib)

* LINT :  Found in waybackpy\wrapper.py:88  Unnecessary else after return
This commit is contained in:
Akash Mahanty 2020-12-13 15:44:37 +05:30 committed by GitHub
parent ca51c14332
commit bc3efc7d63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 59 additions and 74 deletions

View File

@ -1 +1 @@
requests==2.24.0
requests>=2.24.0

View File

@ -2,16 +2,12 @@
import sys
import pytest
import random
import requests
sys.path.append("..")
import waybackpy.wrapper as waybackpy # noqa: E402
from urllib.request import Request, urlopen
from urllib.error import URLError
user_agent = "Mozilla/5.0 (Windows NT 6.2; rv:20.0) Gecko/20121202 Firefox/20.0"
@ -30,10 +26,11 @@ def test_dunders():
assert "en.wikipedia.org" in str(target)
def test_archive_url_parser():
request_url = "https://amazon.com"
hdr = {"User-Agent": user_agent} # nosec
req = Request(request_url, headers=hdr) # nosec
header = waybackpy._get_response(req).headers
endpoint = "https://amazon.com"
user_agent = "Mozilla/5.0 (Windows NT 6.2; rv:20.0) Gecko/20121202 Firefox/20.0"
headers = {"User-Agent": "%s" % user_agent}
response = waybackpy._get_response(endpoint, params=None, headers=headers)
header = response.headers
with pytest.raises(Exception):
waybackpy._archive_url_parser(header)
@ -158,13 +155,11 @@ def test_wayback_timestamp():
def test_get_response():
hdr = {
"User-Agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) "
"Gecko/20100101 Firefox/78.0"
}
req = Request("https://www.google.com", headers=hdr) # nosec
response = waybackpy._get_response(req)
assert response.code == 200
endpoint = "https://www.google.com"
user_agent = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0"
headers = {"User-Agent": "%s" % user_agent}
response = waybackpy._get_response(endpoint, params=None, headers=headers)
assert response.status_code == 200
def test_total_archives():

View File

@ -4,3 +4,8 @@ class WaybackError(Exception):
"""
Raised when Wayback Machine API Service is unreachable/down.
"""
class URLError(Exception):
"""
Raised when malformed URLs are passed as arguments.
"""

View File

@ -1,14 +1,11 @@
# -*- coding: utf-8 -*-
import re
import json
from datetime import datetime, timedelta
from waybackpy.exceptions import WaybackError
from waybackpy.exceptions import WaybackError, URLError
from waybackpy.__version__ import __version__
from urllib.request import Request, urlopen
import requests
import concurrent.futures
from urllib.error import URLError
default_UA = "waybackpy python package - https://github.com/akamhy/waybackpy"
@ -47,15 +44,16 @@ def _wayback_timestamp(**kwargs):
)
def _get_response(req):
def _get_response(endpoint, params=None, headers=None):
"""Get response for the supplied request."""
try:
response = urlopen(req) # nosec
response = requests.get(endpoint, params=params, headers=headers)
except Exception:
try:
response = urlopen(req) # nosec
response = requests.get(endpoint, params=params, headers=headers) # nosec
except Exception as e:
exc = WaybackError("Error while retrieving %s" % req.full_url)
exc = WaybackError("Error while retrieving %s" % endpoint)
exc.__cause__ = e
raise exc
return response
@ -89,9 +87,9 @@ class Url:
)
if self.timestamp == datetime.max:
return td_max.days
else:
diff = datetime.utcnow() - self.timestamp
return diff.days
diff = datetime.utcnow() - self.timestamp
return diff.days
def _url_check(self):
"""Check for common URL problems."""
@ -99,17 +97,11 @@ class Url:
raise URLError("'%s' is not a vaild URL." % self.url)
def _JSON(self):
request_url = "https://archive.org/wayback/available?url=%s" % (
self._clean_url(),
)
hdr = {"User-Agent": "%s" % self.user_agent}
req = Request(request_url, headers=hdr) # nosec
response = _get_response(req)
data_string = response.read().decode("UTF-8")
data = json.loads(data_string)
return data
endpoint = "https://archive.org/wayback/available"
headers = {"User-Agent": "%s" % self.user_agent}
payload = {"url": "%s" % self._clean_url()}
response = _get_response(endpoint, params=payload, headers=headers)
return response.json()
def _archive_url(self):
"""Get URL of archive."""
@ -149,10 +141,9 @@ class Url:
def save(self):
"""Create a new Wayback Machine archive for this URL."""
request_url = "https://web.archive.org/save/" + self._clean_url()
hdr = {"User-Agent": "%s" % self.user_agent} # nosec
req = Request(request_url, headers=hdr) # nosec
header = _get_response(req).headers
self.archive_url = "https://" + _archive_url_parser(header)
headers = {"User-Agent": "%s" % self.user_agent}
response = _get_response(request_url, params=None, headers=headers)
self.archive_url = "https://" + _archive_url_parser(response.headers)
self.timestamp = datetime.utcnow()
return self
@ -167,15 +158,16 @@ class Url:
if not user_agent:
user_agent = self.user_agent
hdr = {"User-Agent": "%s" % user_agent}
req = Request(url, headers=hdr) # nosec
response = _get_response(req)
headers = {"User-Agent": "%s" % self.user_agent}
response = _get_response(url, params=None, headers=headers)
if not encoding:
try:
encoding = response.headers["content-type"].split("charset=")[-1]
encoding = response.encoding
except AttributeError:
encoding = "UTF-8"
return response.read().decode(encoding.replace("text/html", "UTF-8", 1))
return response.content.decode(encoding.replace("text/html", "UTF-8", 1))
def near(self, year=None, month=None, day=None, hour=None, minute=None):
""" Return the closest Wayback Machine archive to the time supplied.
@ -192,14 +184,13 @@ class Url:
minute=minute if minute else now.tm_min,
)
request_url = "https://archive.org/wayback/available?url=%s&timestamp=%s" % (
self._clean_url(),
timestamp,
)
hdr = {"User-Agent": "%s" % self.user_agent}
req = Request(request_url, headers=hdr) # nosec
response = _get_response(req)
data = json.loads(response.read().decode("UTF-8"))
endpoint = "https://archive.org/wayback/available"
headers = {"User-Agent": "%s" % self.user_agent}
payload = {"url": "%s" % self._clean_url(), "timestamp" : timestamp}
response = _get_response(endpoint, params=payload, headers=headers)
print(response.text)
data = response.json()
if not data["archived_snapshots"]:
raise WaybackError(
"Can not find archive for '%s' try later or use wayback.Url(url, user_agent).save() "
@ -229,15 +220,14 @@ class Url:
def total_archives(self):
"""Returns the total number of Wayback Machine archives for this URL."""
hdr = {"User-Agent": "%s" % self.user_agent}
request_url = (
"https://web.archive.org/cdx/search/cdx?url=%s&output=json&fl=statuscode"
% self._clean_url()
)
req = Request(request_url, headers=hdr) # nosec
response = _get_response(req)
endpoint = "https://web.archive.org/cdx/search/cdx"
headers = {"User-Agent": "%s" % self.user_agent, "output" : "json", "fl" : "statuscode"}
payload = {"url": "%s" % self._clean_url()}
response = _get_response(endpoint, params=payload, headers=headers)
# Most efficient method to count number of archives (yet)
return str(response.read()).count(",")
return response.text.count(",")
def pick_live_urls(self, url):
@ -255,9 +245,7 @@ class Url:
def known_urls(self, alive=False, subdomain=False):
"""Returns list of URLs known to exist for given domain name
because these URLs were crawled by WayBack Machine bots.
Useful for pen-testers and others.
Idea by Mohammed Diaa (https://github.com/mhmdiaa) from:
https://gist.github.com/mhmdiaa/adf6bff70142e5091792841d4b372050
"""
@ -268,17 +256,14 @@ class Url:
request_url = (
"https://web.archive.org/cdx/search/cdx?url=*.%s/*&output=json&fl=original&collapse=urlkey" % self._clean_url()
)
else:
request_url = (
"http://web.archive.org/cdx/search/cdx?url=%s/*&output=json&fl=original&collapse=urlkey" % self._clean_url()
)
hdr = {"User-Agent": "%s" % self.user_agent}
req = Request(request_url, headers=hdr) # nosec
response = _get_response(req)
data = json.loads(response.read().decode("UTF-8"))
headers = {"User-Agent": "%s" % self.user_agent}
response = _get_response(request_url, params=None, headers=headers)
data = response.json()
url_list = [y[0] for y in data if y[0] != "original"]
# Remove all deadURLs from url_list if alive=True