now using requests lib as it handles errors nicely
This commit is contained in:
parent
ca51c14332
commit
60ee8b95a8
@ -1 +1 @@
|
|||||||
theme: jekyll-theme-cayman
|
theme: jekyll-theme-cayman
|
||||||
|
@ -1 +1 @@
|
|||||||
requests==2.24.0
|
requests>=2.24.0
|
||||||
|
@ -2,15 +2,13 @@
|
|||||||
import sys
|
import sys
|
||||||
import pytest
|
import pytest
|
||||||
import random
|
import random
|
||||||
|
import requests
|
||||||
|
|
||||||
sys.path.append("..")
|
sys.path.append("..")
|
||||||
|
|
||||||
import waybackpy.wrapper as waybackpy # noqa: E402
|
import waybackpy.wrapper as waybackpy # noqa: E402
|
||||||
|
from urllib.request import Request
|
||||||
|
|
||||||
|
|
||||||
from urllib.request import Request, urlopen
|
|
||||||
from urllib.error import URLError
|
|
||||||
|
|
||||||
|
|
||||||
user_agent = "Mozilla/5.0 (Windows NT 6.2; rv:20.0) Gecko/20121202 Firefox/20.0"
|
user_agent = "Mozilla/5.0 (Windows NT 6.2; rv:20.0) Gecko/20121202 Firefox/20.0"
|
||||||
|
|
||||||
@ -30,10 +28,11 @@ def test_dunders():
|
|||||||
assert "en.wikipedia.org" in str(target)
|
assert "en.wikipedia.org" in str(target)
|
||||||
|
|
||||||
def test_archive_url_parser():
|
def test_archive_url_parser():
|
||||||
request_url = "https://amazon.com"
|
endpoint = "https://amazon.com"
|
||||||
hdr = {"User-Agent": user_agent} # nosec
|
user_agent = "Mozilla/5.0 (Windows NT 6.2; rv:20.0) Gecko/20121202 Firefox/20.0"
|
||||||
req = Request(request_url, headers=hdr) # nosec
|
headers = {"User-Agent": "%s" % user_agent}
|
||||||
header = waybackpy._get_response(req).headers
|
response = waybackpy._get_response(endpoint, params=None, headers=headers)
|
||||||
|
header = response.headers
|
||||||
with pytest.raises(Exception):
|
with pytest.raises(Exception):
|
||||||
waybackpy._archive_url_parser(header)
|
waybackpy._archive_url_parser(header)
|
||||||
|
|
||||||
@ -158,13 +157,11 @@ def test_wayback_timestamp():
|
|||||||
|
|
||||||
|
|
||||||
def test_get_response():
|
def test_get_response():
|
||||||
hdr = {
|
endpoint = "https://www.google.com"
|
||||||
"User-Agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) "
|
user_agent = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0"
|
||||||
"Gecko/20100101 Firefox/78.0"
|
headers = {"User-Agent": "%s" % user_agent}
|
||||||
}
|
response = waybackpy._get_response(endpoint, params=None, headers=headers)
|
||||||
req = Request("https://www.google.com", headers=hdr) # nosec
|
assert response.status_code == 200
|
||||||
response = waybackpy._get_response(req)
|
|
||||||
assert response.code == 200
|
|
||||||
|
|
||||||
|
|
||||||
def test_total_archives():
|
def test_total_archives():
|
||||||
|
@ -4,3 +4,8 @@ class WaybackError(Exception):
|
|||||||
"""
|
"""
|
||||||
Raised when Wayback Machine API Service is unreachable/down.
|
Raised when Wayback Machine API Service is unreachable/down.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
class URLError(Exception):
|
||||||
|
"""
|
||||||
|
Raised when malformed URLs are passed as arguments.
|
||||||
|
"""
|
||||||
|
@ -1,14 +1,11 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import re
|
import re
|
||||||
import json
|
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from waybackpy.exceptions import WaybackError
|
from waybackpy.exceptions import WaybackError, URLError
|
||||||
from waybackpy.__version__ import __version__
|
from waybackpy.__version__ import __version__
|
||||||
from urllib.request import Request, urlopen
|
|
||||||
import requests
|
import requests
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
from urllib.error import URLError
|
|
||||||
|
|
||||||
|
|
||||||
default_UA = "waybackpy python package - https://github.com/akamhy/waybackpy"
|
default_UA = "waybackpy python package - https://github.com/akamhy/waybackpy"
|
||||||
@ -47,13 +44,14 @@ def _wayback_timestamp(**kwargs):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _get_response(req):
|
def _get_response(endpoint, params=None, headers=None):
|
||||||
"""Get response for the supplied request."""
|
"""Get response for the supplied request."""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response = urlopen(req) # nosec
|
response = requests.get(endpoint, params=params, headers=headers)
|
||||||
except Exception:
|
except Exception:
|
||||||
try:
|
try:
|
||||||
response = urlopen(req) # nosec
|
response = requests.get(endpoint, params=params, headers=headers) # nosec
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
exc = WaybackError("Error while retrieving %s" % req.full_url)
|
exc = WaybackError("Error while retrieving %s" % req.full_url)
|
||||||
exc.__cause__ = e
|
exc.__cause__ = e
|
||||||
@ -99,17 +97,11 @@ class Url:
|
|||||||
raise URLError("'%s' is not a vaild URL." % self.url)
|
raise URLError("'%s' is not a vaild URL." % self.url)
|
||||||
|
|
||||||
def _JSON(self):
|
def _JSON(self):
|
||||||
request_url = "https://archive.org/wayback/available?url=%s" % (
|
endpoint = "https://archive.org/wayback/available"
|
||||||
self._clean_url(),
|
headers = {"User-Agent": "%s" % self.user_agent}
|
||||||
)
|
payload = {"url": "%s" % self._clean_url()}
|
||||||
|
response = _get_response(endpoint, params=payload, headers=headers)
|
||||||
hdr = {"User-Agent": "%s" % self.user_agent}
|
return response.json()
|
||||||
req = Request(request_url, headers=hdr) # nosec
|
|
||||||
response = _get_response(req)
|
|
||||||
data_string = response.read().decode("UTF-8")
|
|
||||||
data = json.loads(data_string)
|
|
||||||
|
|
||||||
return data
|
|
||||||
|
|
||||||
def _archive_url(self):
|
def _archive_url(self):
|
||||||
"""Get URL of archive."""
|
"""Get URL of archive."""
|
||||||
@ -149,10 +141,9 @@ class Url:
|
|||||||
def save(self):
|
def save(self):
|
||||||
"""Create a new Wayback Machine archive for this URL."""
|
"""Create a new Wayback Machine archive for this URL."""
|
||||||
request_url = "https://web.archive.org/save/" + self._clean_url()
|
request_url = "https://web.archive.org/save/" + self._clean_url()
|
||||||
hdr = {"User-Agent": "%s" % self.user_agent} # nosec
|
headers = {"User-Agent": "%s" % self.user_agent}
|
||||||
req = Request(request_url, headers=hdr) # nosec
|
response = _get_response(request_url, params=None, headers=headers)
|
||||||
header = _get_response(req).headers
|
self.archive_url = "https://" + _archive_url_parser(response.headers)
|
||||||
self.archive_url = "https://" + _archive_url_parser(header)
|
|
||||||
self.timestamp = datetime.utcnow()
|
self.timestamp = datetime.utcnow()
|
||||||
return self
|
return self
|
||||||
|
|
||||||
@ -167,15 +158,16 @@ class Url:
|
|||||||
if not user_agent:
|
if not user_agent:
|
||||||
user_agent = self.user_agent
|
user_agent = self.user_agent
|
||||||
|
|
||||||
hdr = {"User-Agent": "%s" % user_agent}
|
headers = {"User-Agent": "%s" % self.user_agent}
|
||||||
req = Request(url, headers=hdr) # nosec
|
response = _get_response(url, params=None, headers=headers)
|
||||||
response = _get_response(req)
|
|
||||||
if not encoding:
|
if not encoding:
|
||||||
try:
|
try:
|
||||||
encoding = response.headers["content-type"].split("charset=")[-1]
|
encoding = response.encoding
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
encoding = "UTF-8"
|
encoding = "UTF-8"
|
||||||
return response.read().decode(encoding.replace("text/html", "UTF-8", 1))
|
|
||||||
|
return response.content.decode(encoding.replace("text/html", "UTF-8", 1))
|
||||||
|
|
||||||
def near(self, year=None, month=None, day=None, hour=None, minute=None):
|
def near(self, year=None, month=None, day=None, hour=None, minute=None):
|
||||||
""" Return the closest Wayback Machine archive to the time supplied.
|
""" Return the closest Wayback Machine archive to the time supplied.
|
||||||
@ -192,14 +184,13 @@ class Url:
|
|||||||
minute=minute if minute else now.tm_min,
|
minute=minute if minute else now.tm_min,
|
||||||
)
|
)
|
||||||
|
|
||||||
request_url = "https://archive.org/wayback/available?url=%s×tamp=%s" % (
|
|
||||||
self._clean_url(),
|
endpoint = "https://archive.org/wayback/available"
|
||||||
timestamp,
|
headers = {"User-Agent": "%s" % self.user_agent}
|
||||||
)
|
payload = {"url": "%s" % self._clean_url(), "timestamp" : timestamp}
|
||||||
hdr = {"User-Agent": "%s" % self.user_agent}
|
response = _get_response(endpoint, params=payload, headers=headers)
|
||||||
req = Request(request_url, headers=hdr) # nosec
|
print(response.text)
|
||||||
response = _get_response(req)
|
data = response.json()
|
||||||
data = json.loads(response.read().decode("UTF-8"))
|
|
||||||
if not data["archived_snapshots"]:
|
if not data["archived_snapshots"]:
|
||||||
raise WaybackError(
|
raise WaybackError(
|
||||||
"Can not find archive for '%s' try later or use wayback.Url(url, user_agent).save() "
|
"Can not find archive for '%s' try later or use wayback.Url(url, user_agent).save() "
|
||||||
@ -229,15 +220,14 @@ class Url:
|
|||||||
|
|
||||||
def total_archives(self):
|
def total_archives(self):
|
||||||
"""Returns the total number of Wayback Machine archives for this URL."""
|
"""Returns the total number of Wayback Machine archives for this URL."""
|
||||||
hdr = {"User-Agent": "%s" % self.user_agent}
|
|
||||||
request_url = (
|
endpoint = "https://web.archive.org/cdx/search/cdx"
|
||||||
"https://web.archive.org/cdx/search/cdx?url=%s&output=json&fl=statuscode"
|
headers = {"User-Agent": "%s" % self.user_agent, "output" : "json", "fl" : "statuscode"}
|
||||||
% self._clean_url()
|
payload = {"url": "%s" % self._clean_url()}
|
||||||
)
|
response = _get_response(endpoint, params=payload, headers=headers)
|
||||||
req = Request(request_url, headers=hdr) # nosec
|
|
||||||
response = _get_response(req)
|
|
||||||
# Most efficient method to count number of archives (yet)
|
# Most efficient method to count number of archives (yet)
|
||||||
return str(response.read()).count(",")
|
return response.text.count(",")
|
||||||
|
|
||||||
def pick_live_urls(self, url):
|
def pick_live_urls(self, url):
|
||||||
|
|
||||||
@ -255,9 +245,7 @@ class Url:
|
|||||||
def known_urls(self, alive=False, subdomain=False):
|
def known_urls(self, alive=False, subdomain=False):
|
||||||
"""Returns list of URLs known to exist for given domain name
|
"""Returns list of URLs known to exist for given domain name
|
||||||
because these URLs were crawled by WayBack Machine bots.
|
because these URLs were crawled by WayBack Machine bots.
|
||||||
|
|
||||||
Useful for pen-testers and others.
|
Useful for pen-testers and others.
|
||||||
|
|
||||||
Idea by Mohammed Diaa (https://github.com/mhmdiaa) from:
|
Idea by Mohammed Diaa (https://github.com/mhmdiaa) from:
|
||||||
https://gist.github.com/mhmdiaa/adf6bff70142e5091792841d4b372050
|
https://gist.github.com/mhmdiaa/adf6bff70142e5091792841d4b372050
|
||||||
"""
|
"""
|
||||||
@ -268,17 +256,14 @@ class Url:
|
|||||||
request_url = (
|
request_url = (
|
||||||
"https://web.archive.org/cdx/search/cdx?url=*.%s/*&output=json&fl=original&collapse=urlkey" % self._clean_url()
|
"https://web.archive.org/cdx/search/cdx?url=*.%s/*&output=json&fl=original&collapse=urlkey" % self._clean_url()
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
request_url = (
|
request_url = (
|
||||||
"http://web.archive.org/cdx/search/cdx?url=%s/*&output=json&fl=original&collapse=urlkey" % self._clean_url()
|
"http://web.archive.org/cdx/search/cdx?url=%s/*&output=json&fl=original&collapse=urlkey" % self._clean_url()
|
||||||
)
|
)
|
||||||
|
|
||||||
hdr = {"User-Agent": "%s" % self.user_agent}
|
headers = {"User-Agent": "%s" % self.user_agent}
|
||||||
req = Request(request_url, headers=hdr) # nosec
|
response = _get_response(request_url, params=None, headers=headers)
|
||||||
response = _get_response(req)
|
data = response.json()
|
||||||
|
|
||||||
data = json.loads(response.read().decode("UTF-8"))
|
|
||||||
url_list = [y[0] for y in data if y[0] != "original"]
|
url_list = [y[0] for y in data if y[0] != "original"]
|
||||||
|
|
||||||
# Remove all deadURLs from url_list if alive=True
|
# Remove all deadURLs from url_list if alive=True
|
||||||
|
Loading…
Reference in New Issue
Block a user