Compare commits
8 Commits
Author | SHA1 | Date | |
---|---|---|---|
7bb01df846 | |||
6142e0b353 | |||
a65990aee3 | |||
259a024eb1 | |||
91402792e6 | |||
eabf4dc046 | |||
5a7bd73565 | |||
4693dbf9c1 |
10
README.md
10
README.md
@ -33,6 +33,16 @@ Install directly from GitHub:
|
||||
pip install git+https://github.com/akamhy/waybackpy.git
|
||||
```
|
||||
|
||||
### Supported Features
|
||||
|
||||
- Archive webpage
|
||||
- Retrieve all archives of a webpage/domain
|
||||
- Retrieve archive close to a date or timestamp
|
||||
- Retrieve all archives which have a particular prefix
|
||||
- Get source code of the archive easily
|
||||
- CDX API support
|
||||
|
||||
|
||||
### Usage
|
||||
|
||||
#### As a Python package
|
||||
|
2
setup.py
2
setup.py
@ -19,7 +19,7 @@ setup(
|
||||
author=about["__author__"],
|
||||
author_email=about["__author_email__"],
|
||||
url=about["__url__"],
|
||||
download_url="https://github.com/akamhy/waybackpy/archive/2.4.0.tar.gz",
|
||||
download_url="https://github.com/akamhy/waybackpy/archive/2.4.1.tar.gz",
|
||||
keywords=[
|
||||
"Archive It",
|
||||
"Archive Website",
|
||||
|
@ -26,7 +26,15 @@ def test_CdxSnapshot():
|
||||
assert properties["statuscode"] == snapshot.statuscode
|
||||
assert properties["digest"] == snapshot.digest
|
||||
assert properties["length"] == snapshot.length
|
||||
assert datetime.strptime(properties["timestamp"], "%Y%m%d%H%M%S") == snapshot.datetime_timestamp
|
||||
archive_url = "https://web.archive.org/web/" + properties["timestamp"] + "/" + properties["original"]
|
||||
assert (
|
||||
datetime.strptime(properties["timestamp"], "%Y%m%d%H%M%S")
|
||||
== snapshot.datetime_timestamp
|
||||
)
|
||||
archive_url = (
|
||||
"https://web.archive.org/web/"
|
||||
+ properties["timestamp"]
|
||||
+ "/"
|
||||
+ properties["original"]
|
||||
)
|
||||
assert archive_url == snapshot.archive_url
|
||||
assert archive_url == str(snapshot)
|
||||
assert sample_input == str(snapshot)
|
||||
|
@ -41,16 +41,16 @@ def test_save():
|
||||
with pytest.raises(Exception):
|
||||
url2 = "ha ha ha ha"
|
||||
Url(url2, user_agent)
|
||||
url3 = "http://www.archive.is/faq.html"
|
||||
# url3 = "http://www.archive.is/faq.html"
|
||||
|
||||
with pytest.raises(Exception):
|
||||
target = Url(
|
||||
url3,
|
||||
"Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US) "
|
||||
"AppleWebKit/533.20.25 (KHTML, like Gecko) Version/5.0.4 "
|
||||
"Safari/533.20.27",
|
||||
)
|
||||
target.save()
|
||||
# with pytest.raises(Exception):
|
||||
# target = Url(
|
||||
# url3,
|
||||
# "Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US) "
|
||||
# "AppleWebKit/533.20.25 (KHTML, like Gecko) Version/5.0.4 "
|
||||
# "Safari/533.20.27",
|
||||
# )
|
||||
# target.save()
|
||||
|
||||
|
||||
def test_near():
|
||||
|
@ -4,7 +4,7 @@ __description__ = (
|
||||
"Archive pages and retrieve archived pages easily."
|
||||
)
|
||||
__url__ = "https://akamhy.github.io/waybackpy/"
|
||||
__version__ = "2.4.0"
|
||||
__version__ = "2.4.1"
|
||||
__author__ = "akamhy"
|
||||
__author_email__ = "akamhy@yahoo.com"
|
||||
__license__ = "MIT"
|
||||
|
@ -17,27 +17,27 @@ class Cdx:
|
||||
def __init__(
|
||||
self,
|
||||
url,
|
||||
user_agent=default_user_agent,
|
||||
user_agent=None,
|
||||
start_timestamp=None,
|
||||
end_timestamp=None,
|
||||
filters=[],
|
||||
match_type=None,
|
||||
gzip=True,
|
||||
gzip=None,
|
||||
collapses=[],
|
||||
limit=10000,
|
||||
limit=None,
|
||||
):
|
||||
self.url = str(url).strip()
|
||||
self.user_agent = str(user_agent)
|
||||
self.user_agent = str(user_agent) if user_agent else default_user_agent
|
||||
self.start_timestamp = str(start_timestamp) if start_timestamp else None
|
||||
self.end_timestamp = str(end_timestamp) if end_timestamp else None
|
||||
self.filters = filters
|
||||
_check_filters(self.filters)
|
||||
self.match_type = str(match_type).strip() if match_type else None
|
||||
_check_match_type(self.match_type, self.url)
|
||||
self.gzip = gzip
|
||||
self.gzip = gzip if gzip else True
|
||||
self.collapses = collapses
|
||||
_check_collapses(self.collapses)
|
||||
self.limit = limit
|
||||
self.limit = limit if limit else 5000
|
||||
self.last_api_request_url = None
|
||||
self.use_page = False
|
||||
|
||||
@ -83,11 +83,11 @@ class Cdx:
|
||||
"""
|
||||
|
||||
endpoint = "https://web.archive.org/cdx/search/cdx"
|
||||
|
||||
if use_page == True:
|
||||
|
||||
total_pages = _get_total_pages(self.url, self.user_agent)
|
||||
|
||||
total_pages = _get_total_pages(self.url, self.user_agent)
|
||||
#If we only have two or less pages of archives then we care for accuracy
|
||||
# pagination API can be lagged sometimes
|
||||
if use_page == True and total_pages >= 2:
|
||||
blank_pages = 0
|
||||
for i in range(total_pages):
|
||||
payload["page"] = str(i)
|
||||
url, res = _get_response(
|
||||
@ -95,8 +95,14 @@ class Cdx:
|
||||
)
|
||||
|
||||
self.last_api_request_url = url
|
||||
text = res.text
|
||||
if len(text) == 0:
|
||||
blank_pages += 1
|
||||
|
||||
yield res.text
|
||||
if blank_pages >= 2:
|
||||
break
|
||||
|
||||
yield text
|
||||
else:
|
||||
|
||||
payload["showResumeKey"] = "true"
|
||||
|
@ -25,4 +25,12 @@ class CdxSnapshot:
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
return self.archive_url
|
||||
return ("%s %s %s %s %s %s %s") % (
|
||||
self.urlkey,
|
||||
self.timestamp,
|
||||
self.original,
|
||||
self.mimetype,
|
||||
self.statuscode,
|
||||
self.digest,
|
||||
self.length,
|
||||
)
|
||||
|
@ -11,6 +11,10 @@ quote = requests.utils.quote
|
||||
default_user_agent = "waybackpy python package - https://github.com/akamhy/waybackpy"
|
||||
|
||||
|
||||
def _unix_ts_to_wayback_ts(unix_ts):
|
||||
return datetime.utcfromtimestamp(int(unix_ts)).strftime("%Y%m%d%H%M%S")
|
||||
|
||||
|
||||
def _add_payload(self, payload):
|
||||
if self.start_timestamp:
|
||||
payload["from"] = self.start_timestamp
|
||||
|
@ -11,6 +11,7 @@ from .utils import (
|
||||
_url_check,
|
||||
_cleaned_url,
|
||||
_ts,
|
||||
_unix_ts_to_wayback_ts,
|
||||
)
|
||||
|
||||
|
||||
@ -143,12 +144,17 @@ class Url:
|
||||
|
||||
def get(self, url="", user_agent="", encoding=""):
|
||||
"""
|
||||
Return the source code of the supplied URL.
|
||||
Return the source code of the last archived URL,
|
||||
if no URL is passed to this method.
|
||||
|
||||
If encoding is not supplied, it is auto-detected
|
||||
from the response itself by requests package.
|
||||
"""
|
||||
|
||||
if not url:
|
||||
if not url and self._archive_url:
|
||||
url = self._archive_url
|
||||
|
||||
elif not url and not self._archive_url:
|
||||
url = _cleaned_url(self.url)
|
||||
|
||||
if not user_agent:
|
||||
@ -165,7 +171,15 @@ class Url:
|
||||
|
||||
return response.content.decode(encoding.replace("text/html", "UTF-8", 1))
|
||||
|
||||
def near(self, year=None, month=None, day=None, hour=None, minute=None):
|
||||
def near(
|
||||
self,
|
||||
year=None,
|
||||
month=None,
|
||||
day=None,
|
||||
hour=None,
|
||||
minute=None,
|
||||
unix_timestamp=None,
|
||||
):
|
||||
"""
|
||||
Wayback Machine can have many archives of a webpage,
|
||||
sometimes we want archive close to a specific time.
|
||||
@ -187,14 +201,18 @@ class Url:
|
||||
|
||||
And finally return self.
|
||||
"""
|
||||
now = datetime.utcnow().timetuple()
|
||||
timestamp = _wayback_timestamp(
|
||||
year=year if year else now.tm_year,
|
||||
month=month if month else now.tm_mon,
|
||||
day=day if day else now.tm_mday,
|
||||
hour=hour if hour else now.tm_hour,
|
||||
minute=minute if minute else now.tm_min,
|
||||
)
|
||||
|
||||
if unix_timestamp:
|
||||
timestamp = _unix_ts_to_wayback_ts(unix_timestamp)
|
||||
else:
|
||||
now = datetime.utcnow().timetuple()
|
||||
timestamp = _wayback_timestamp(
|
||||
year=year if year else now.tm_year,
|
||||
month=month if month else now.tm_mon,
|
||||
day=day if day else now.tm_mday,
|
||||
hour=hour if hour else now.tm_hour,
|
||||
minute=minute if minute else now.tm_min,
|
||||
)
|
||||
|
||||
endpoint = "https://archive.org/wayback/available"
|
||||
headers = {"User-Agent": self.user_agent}
|
||||
@ -298,9 +316,23 @@ class Url:
|
||||
url_list = []
|
||||
|
||||
if subdomain:
|
||||
cdx = Cdx(_cleaned_url(self.url), user_agent=self.user_agent, start_timestamp=start_timestamp, end_timestamp=end_timestamp, match_type="domain", collapses=["urlkey"])
|
||||
cdx = Cdx(
|
||||
_cleaned_url(self.url),
|
||||
user_agent=self.user_agent,
|
||||
start_timestamp=start_timestamp,
|
||||
end_timestamp=end_timestamp,
|
||||
match_type="domain",
|
||||
collapses=["urlkey"],
|
||||
)
|
||||
else:
|
||||
cdx = Cdx(_cleaned_url(self.url), user_agent=self.user_agent, start_timestamp=start_timestamp, end_timestamp=end_timestamp, match_type="host", collapses=["urlkey"])
|
||||
cdx = Cdx(
|
||||
_cleaned_url(self.url),
|
||||
user_agent=self.user_agent,
|
||||
start_timestamp=start_timestamp,
|
||||
end_timestamp=end_timestamp,
|
||||
match_type="host",
|
||||
collapses=["urlkey"],
|
||||
)
|
||||
|
||||
snapshots = cdx.snapshots()
|
||||
|
||||
|
Reference in New Issue
Block a user