Create separate module for the 3 different APIs also CDX is now CLI supported.

This commit is contained in:
Akash Mahanty 2022-01-02 14:14:45 +05:30
parent a7b805292d
commit 4e68cd5743
25 changed files with 755 additions and 2337 deletions

View File

@ -1,42 +0,0 @@
# This workflow will install Python dependencies, run tests and lint with a variety of Python versions
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions
name: CI
on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.8']
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install flake8 pytest codecov pytest-cov
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pytest --cov=waybackpy tests/
- name: Upload coverage to Codecov
run: |
bash <(curl -s https://codecov.io/bash) -t ${{ secrets.CODECOV_TOKEN }}

View File

@ -1,4 +0,0 @@
# File : .pep8speaks.yml
scanner:
diff_only: True # If True, errors caused by only the patch are shown

View File

@ -1,5 +0,0 @@
# autogenerated pyup.io config file
# see https://pyup.io/docs/configuration/ for all available options
schedule: ''
update: false

View File

@ -1,8 +0,0 @@
{
"checkRunSettings": {
"vulnerableCheckRunConclusionLevel": "failure"
},
"issueSettings": {
"minSeverityLevel": "LOW"
}
}

View File

@ -1,58 +0,0 @@
# Contributing to waybackpy
We love your input! We want to make contributing to this project as easy and transparent as possible, whether it's:
- Reporting a bug
- Discussing the current state of the code
- Submitting a fix
- Proposing new features
- Becoming a maintainer
## We Develop with Github
We use github to host code, to track issues and feature requests, as well as accept pull requests.
## We Use [Github Flow](https://guides.github.com/introduction/flow/index.html), So All Code Changes Happen Through Pull Requests
Pull requests are the best way to propose changes to the codebase (we use [Github Flow](https://guides.github.com/introduction/flow/index.html)). We actively welcome your pull requests:
1. Fork the repo and create your branch from `master`.
2. If you've added code that should be tested, add tests.
3. If you've changed APIs, update the documentation.
4. Ensure the test suite passes.
5. Make sure your code lints.
6. Issue that pull request!
## Any contributions you make will be under the MIT Software License
In short, when you submit code changes, your submissions are understood to be under the same [MIT License](https://github.com/akamhy/waybackpy/blob/master/LICENSE) that covers the project. Feel free to contact the maintainers if that's a concern.
## Report bugs using Github's [issues](https://github.com/akamhy/waybackpy/issues)
We use GitHub issues to track public bugs. Report a bug by [opening a new issue](https://github.com/akamhy/waybackpy/issues/new); it's that easy!
## Write bug reports with detail, background, and sample code
**Great Bug Reports** tend to have:
- A quick summary and/or background
- Steps to reproduce
- Be specific!
- Give sample code if you can.
- What you expected would happen
- What actually happens
- Notes (possibly including why you think this might be happening, or stuff you tried that didn't work)
People *love* thorough bug reports. I'm not even kidding.
## Use a Consistent Coding Style
* You can try running `flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics` for style unification.
## License
By contributing, you agree that your contributions will be licensed under its [MIT License](https://github.com/akamhy/waybackpy/blob/master/LICENSE).
## References
This document is forked from [this gist](https://gist.github.com/briandk/3d2e8b3ec8daf5a27a62) by [briandk](https://github.com/briandk) which was itself adapted from the open-source contribution guidelines for [Facebook's Draft](https://github.com/facebook/draft-js/blob/a9316a723f9e918afde44dea68b5f9f39b7d9b00/CONTRIBUTING.md)

View File

@ -6,17 +6,6 @@
</div> </div>
<p align="center">
<a href="https://pypi.org/project/waybackpy/"><img alt="pypi" src="https://img.shields.io/pypi/v/waybackpy.svg"></a>
<a href="https://github.com/akamhy/waybackpy/actions?query=workflow%3ACI"><img alt="Build Status" src="https://github.com/akamhy/waybackpy/workflows/CI/badge.svg"></a>
<a href="https://www.codacy.com/manual/akamhy/waybackpy?utm_source=github.com&amp;utm_medium=referral&amp;utm_content=akamhy/waybackpy&amp;utm_campaign=Badge_Grade"><img alt="Codacy Badge" src="https://api.codacy.com/project/badge/Grade/255459cede9341e39436ec8866d3fb65"></a>
<a href="https://codecov.io/gh/akamhy/waybackpy"><img alt="codecov" src="https://codecov.io/gh/akamhy/waybackpy/branch/master/graph/badge.svg"></a>
<a href="https://github.com/akamhy/waybackpy/blob/master/CONTRIBUTING.md"><img alt="Contributions Welcome" src="https://img.shields.io/static/v1.svg?label=Contributions&message=Welcome&color=0059b3&style=flat-square"></a>
<a href="https://pepy.tech/project/waybackpy?versions=2*&versions=1*&versions=3*"><img alt="Downloads" src="https://pepy.tech/badge/waybackpy/month"></a>
<a href="https://github.com/akamhy/waybackpy/commits/master"><img alt="GitHub lastest commit" src="https://img.shields.io/github/last-commit/akamhy/waybackpy?color=blue&style=flat-square"></a>
<a href="#"><img alt="PyPI - Python Version" src="https://img.shields.io/pypi/pyversions/waybackpy?style=flat-square"></a>
</p>
----------------------------------------------------------------------------------------------------------------------------------------------- -----------------------------------------------------------------------------------------------------------------------------------------------
### Installation ### Installation
@ -33,14 +22,14 @@ Install directly from GitHub:
pip install git+https://github.com/akamhy/waybackpy.git pip install git+https://github.com/akamhy/waybackpy.git
``` ```
### Supported Features ### Supported APIs
Wayback Machine has 3 client side APIs.
- Archive webpage - Save API
- Retrieve all archives of a webpage/domain - Availability API
- Retrieve archive close to a date or timestamp - CDX API
- Retrieve all archives which have a particular prefix
- Get source code of the archive easily All three of these can be accessed by waybackpy.
- CDX API support
### Usage ### Usage
@ -87,9 +76,6 @@ https://web.archive.org/web/20040415020811/http://en.wikipedia.org:80/wiki/Human
$ waybackpy --newest --url "https://en.wikipedia.org/wiki/Remote_sensing" --user_agent "my-unique-user-agent" $ waybackpy --newest --url "https://en.wikipedia.org/wiki/Remote_sensing" --user_agent "my-unique-user-agent"
https://web.archive.org/web/20201221130522/https://en.wikipedia.org/wiki/Remote_sensing https://web.archive.org/web/20201221130522/https://en.wikipedia.org/wiki/Remote_sensing
$ waybackpy --total --url "https://en.wikipedia.org/wiki/Linux_kernel" --user_agent "my-unique-user-agent"
1904
$ waybackpy --known_urls --url akamhy.github.io --user_agent "my-unique-user-agent" --file $ waybackpy --known_urls --url akamhy.github.io --user_agent "my-unique-user-agent" --file
https://akamhy.github.io https://akamhy.github.io
https://akamhy.github.io/assets/js/scale.fix.js https://akamhy.github.io/assets/js/scale.fix.js

View File

@ -1 +1,2 @@
requests>=2.24.0 click
requests

View File

@ -19,21 +19,18 @@ setup(
author=about["__author__"], author=about["__author__"],
author_email=about["__author_email__"], author_email=about["__author_email__"],
url=about["__url__"], url=about["__url__"],
download_url="https://github.com/akamhy/waybackpy/archive/2.4.4.tar.gz", download_url="https://github.com/akamhy/waybackpy/archive/3.0.0.tar.gz",
keywords=[ keywords=[
"Archive It",
"Archive Website", "Archive Website",
"Wayback Machine", "Wayback Machine",
"waybackurls",
"Internet Archive", "Internet Archive",
], ],
install_requires=["requests"], install_requires=["requests", "click"],
python_requires=">=3.4", python_requires=">=3.4",
classifiers=[ classifiers=[
"Development Status :: 5 - Production/Stable", "Development Status :: 4 - Beta",
"Intended Audience :: Developers", "Intended Audience :: Developers",
"Natural Language :: English", "Natural Language :: English",
"Topic :: Software Development :: Build Tools",
"License :: OSI Approved :: MIT License", "License :: OSI Approved :: MIT License",
"Programming Language :: Python", "Programming Language :: Python",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",

View File

View File

@ -1,93 +0,0 @@
import pytest
from waybackpy.cdx import Cdx
from waybackpy.exceptions import WaybackError
def test_all_cdx():
url = "akamhy.github.io"
user_agent = "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, \
like Gecko) Chrome/45.0.2454.85 Safari/537.36"
cdx = Cdx(
url=url,
user_agent=user_agent,
start_timestamp=2017,
end_timestamp=2020,
filters=[
"statuscode:200",
"mimetype:text/html",
"timestamp:20201002182319",
"original:https://akamhy.github.io/",
],
gzip=False,
collapses=["timestamp:10", "digest"],
limit=50,
match_type="prefix",
)
snapshots = cdx.snapshots()
for snapshot in snapshots:
ans = snapshot.archive_url
assert "https://web.archive.org/web/20201002182319/https://akamhy.github.io/" == ans
url = "akahfjgjkmhy.gihthub.ip"
cdx = Cdx(
url=url,
user_agent=user_agent,
start_timestamp=None,
end_timestamp=None,
filters=[],
match_type=None,
gzip=True,
collapses=[],
limit=10,
)
snapshots = cdx.snapshots()
print(snapshots)
i = 0
for _ in snapshots:
i += 1
assert i == 0
url = "https://github.com/akamhy/waybackpy/*"
cdx = Cdx(url=url, user_agent=user_agent, limit=50)
snapshots = cdx.snapshots()
for snapshot in snapshots:
print(snapshot.archive_url)
url = "https://github.com/akamhy/waybackpy"
with pytest.raises(WaybackError):
cdx = Cdx(url=url, user_agent=user_agent, limit=50, filters=["ghddhfhj"])
snapshots = cdx.snapshots()
with pytest.raises(WaybackError):
cdx = Cdx(url=url, user_agent=user_agent, collapses=["timestamp", "ghdd:hfhj"])
snapshots = cdx.snapshots()
url = "https://github.com"
cdx = Cdx(url=url, user_agent=user_agent, limit=50)
snapshots = cdx.snapshots()
c = 0
for snapshot in snapshots:
c += 1
if c > 100:
break
url = "https://github.com/*"
cdx = Cdx(url=url, user_agent=user_agent, collapses=["timestamp"])
snapshots = cdx.snapshots()
c = 0
for snapshot in snapshots:
c += 1
if c > 30529: # deafult limit is 10k
break
url = "https://github.com/*"
cdx = Cdx(url=url, user_agent=user_agent)
c = 0
snapshots = cdx.snapshots()
for snapshot in snapshots:
c += 1
if c > 100529:
break

View File

@ -1,359 +0,0 @@
import sys
import os
import pytest
import random
import string
import argparse
import waybackpy.cli as cli
from waybackpy.wrapper import Url # noqa: E402
from waybackpy.__version__ import __version__
def test_save():
args = argparse.Namespace(
user_agent=None,
url="https://hfjfjfjfyu6r6rfjvj.fjhgjhfjgvjm",
total=False,
version=False,
file=False,
oldest=False,
save=True,
json=False,
archive_url=False,
newest=False,
near=False,
subdomain=False,
known_urls=False,
get=None,
)
reply = cli.args_handler(args)
assert "could happen because either your waybackpy" or "cannot be archived by wayback machine as it is a redirect" in str(reply)
def test_json():
args = argparse.Namespace(
user_agent=None,
url="https://pypi.org/user/akamhy/",
total=False,
version=False,
file=False,
oldest=False,
save=False,
json=True,
archive_url=False,
newest=False,
near=False,
subdomain=False,
known_urls=False,
get=None,
)
reply = cli.args_handler(args)
assert "archived_snapshots" in str(reply)
def test_archive_url():
args = argparse.Namespace(
user_agent=None,
url="https://pypi.org/user/akamhy/",
total=False,
version=False,
file=False,
oldest=False,
save=False,
json=False,
archive_url=True,
newest=False,
near=False,
subdomain=False,
known_urls=False,
get=None,
)
reply = cli.args_handler(args)
assert "https://web.archive.org/web/" in str(reply)
def test_oldest():
args = argparse.Namespace(
user_agent=None,
url="https://pypi.org/user/akamhy/",
total=False,
version=False,
file=False,
oldest=True,
save=False,
json=False,
archive_url=False,
newest=False,
near=False,
subdomain=False,
known_urls=False,
get=None,
)
reply = cli.args_handler(args)
assert "pypi.org/user/akamhy" in str(reply)
uid = "".join(
random.choice(string.ascii_lowercase + string.digits) for _ in range(6)
)
url = "https://pypi.org/yfvjvycyc667r67ed67r" + uid
args = argparse.Namespace(
user_agent=None,
url=url,
total=False,
version=False,
file=False,
oldest=True,
save=False,
json=False,
archive_url=False,
newest=False,
near=False,
subdomain=False,
known_urls=False,
get=None,
)
reply = cli.args_handler(args)
assert "Can not find archive for" in str(reply)
def test_newest():
args = argparse.Namespace(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/600.8.9 \
(KHTML, like Gecko) Version/8.0.8 Safari/600.8.9",
url="https://pypi.org/user/akamhy/",
total=False,
version=False,
file=False,
oldest=False,
save=False,
json=False,
archive_url=False,
newest=True,
near=False,
subdomain=False,
known_urls=False,
get=None,
)
reply = cli.args_handler(args)
assert "pypi.org/user/akamhy" in str(reply)
uid = "".join(
random.choice(string.ascii_lowercase + string.digits) for _ in range(6)
)
url = "https://pypi.org/yfvjvycyc667r67ed67r" + uid
args = argparse.Namespace(
user_agent=None,
url=url,
total=False,
version=False,
file=False,
oldest=False,
save=False,
json=False,
archive_url=False,
newest=True,
near=False,
subdomain=False,
known_urls=False,
get=None,
)
reply = cli.args_handler(args)
assert "Can not find archive for" in str(reply)
def test_total_archives():
args = argparse.Namespace(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/600.8.9 \
(KHTML, like Gecko) Version/8.0.8 Safari/600.8.9",
url="https://pypi.org/user/akamhy/",
total=True,
version=False,
file=False,
oldest=False,
save=False,
json=False,
archive_url=False,
newest=False,
near=False,
subdomain=False,
known_urls=False,
get=None,
)
reply = cli.args_handler(args)
assert isinstance(reply, int)
def test_known_urls():
args = argparse.Namespace(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/600.8.9 \
(KHTML, like Gecko) Version/8.0.8 Safari/600.8.9",
url="https://www.keybr.com",
total=False,
version=False,
file=True,
oldest=False,
save=False,
json=False,
archive_url=False,
newest=False,
near=False,
subdomain=False,
known_urls=True,
get=None,
)
reply = cli.args_handler(args)
assert "keybr" in str(reply)
def test_near():
args = argparse.Namespace(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/600.8.9 \
(KHTML, like Gecko) Version/8.0.8 Safari/600.8.9",
url="https://pypi.org/user/akamhy/",
total=False,
version=False,
file=False,
oldest=False,
save=False,
json=False,
archive_url=False,
newest=False,
near=True,
subdomain=False,
known_urls=False,
get=None,
year=2020,
month=7,
day=15,
hour=1,
minute=1,
)
reply = cli.args_handler(args)
assert "202007" in str(reply)
uid = "".join(
random.choice(string.ascii_lowercase + string.digits) for _ in range(6)
)
url = "https://pypi.org/yfvjvycyc667r67ed67r" + uid
args = argparse.Namespace(
user_agent=None,
url=url,
total=False,
version=False,
file=False,
oldest=False,
save=False,
json=False,
archive_url=False,
newest=False,
near=True,
subdomain=False,
known_urls=False,
get=None,
year=2020,
month=7,
day=15,
hour=1,
minute=1,
)
reply = cli.args_handler(args)
assert "Can not find archive for" in str(reply)
def test_get():
args = argparse.Namespace(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/600.8.9 \
(KHTML, like Gecko) Version/8.0.8 Safari/600.8.9",
url="https://github.com/akamhy",
total=False,
version=False,
file=False,
oldest=False,
save=False,
json=False,
archive_url=False,
newest=False,
near=False,
subdomain=False,
known_urls=False,
get="url",
)
reply = cli.args_handler(args)
assert "waybackpy" in str(reply)
args = argparse.Namespace(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/600.8.9 \
(KHTML, like Gecko) Version/8.0.8 Safari/600.8.9",
url="https://github.com/akamhy/waybackpy",
total=False,
version=False,
file=False,
oldest=False,
save=False,
json=False,
archive_url=False,
newest=False,
near=False,
subdomain=False,
known_urls=False,
get="oldest",
)
reply = cli.args_handler(args)
assert "waybackpy" in str(reply)
args = argparse.Namespace(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/600.8.9 \
(KHTML, like Gecko) Version/8.0.8 Safari/600.8.9",
url="https://akamhy.github.io/waybackpy/",
total=False,
version=False,
file=False,
oldest=False,
save=False,
json=False,
archive_url=False,
newest=False,
near=False,
subdomain=False,
known_urls=False,
get="newest",
)
reply = cli.args_handler(args)
assert "waybackpy" in str(reply)
args = argparse.Namespace(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/600.8.9 \
(KHTML, like Gecko) Version/8.0.8 Safari/600.8.9",
url="https://pypi.org/user/akamhy/",
total=False,
version=False,
file=False,
oldest=False,
save=False,
json=False,
archive_url=False,
newest=False,
near=False,
subdomain=False,
known_urls=False,
get="foobar",
)
reply = cli.args_handler(args)
assert "get the source code of the" in str(reply)
def test_args_handler():
args = argparse.Namespace(version=True)
reply = cli.args_handler(args)
assert ("waybackpy version %s" % (__version__)) == reply
args = argparse.Namespace(url=None, version=False)
reply = cli.args_handler(args)
assert ("waybackpy %s" % (__version__)) in str(reply)
def test_main():
# This also tests the parse_args method in cli.py
cli.main(["temp.py", "--version"])

View File

@ -1,40 +0,0 @@
import pytest
from waybackpy.snapshot import CdxSnapshot, datetime
def test_CdxSnapshot():
sample_input = "org,archive)/ 20080126045828 http://github.com text/html 200 Q4YULN754FHV2U6Q5JUT6Q2P57WEWNNY 1415"
prop_values = sample_input.split(" ")
properties = {}
(
properties["urlkey"],
properties["timestamp"],
properties["original"],
properties["mimetype"],
properties["statuscode"],
properties["digest"],
properties["length"],
) = prop_values
snapshot = CdxSnapshot(properties)
assert properties["urlkey"] == snapshot.urlkey
assert properties["timestamp"] == snapshot.timestamp
assert properties["original"] == snapshot.original
assert properties["mimetype"] == snapshot.mimetype
assert properties["statuscode"] == snapshot.statuscode
assert properties["digest"] == snapshot.digest
assert properties["length"] == snapshot.length
assert (
datetime.strptime(properties["timestamp"], "%Y%m%d%H%M%S")
== snapshot.datetime_timestamp
)
archive_url = (
"https://web.archive.org/web/"
+ properties["timestamp"]
+ "/"
+ properties["original"]
)
assert archive_url == snapshot.archive_url
assert sample_input == str(snapshot)

View File

@ -1,186 +0,0 @@
import pytest
import json
from waybackpy.utils import (
_cleaned_url,
_url_check,
_full_url,
URLError,
WaybackError,
_get_total_pages,
_archive_url_parser,
_wayback_timestamp,
_get_response,
_check_match_type,
_check_collapses,
_check_filters,
_timestamp_manager,
)
def test_timestamp_manager():
timestamp = True
data = {}
assert _timestamp_manager(timestamp, data)
data = """
{"archived_snapshots": {"closest": {"timestamp": "20210109155628", "available": true, "status": "200", "url": "http://web.archive.org/web/20210109155628/https://www.google.com/"}}, "url": "https://www.google.com/"}
"""
data = json.loads(data)
assert data["archived_snapshots"]["closest"]["timestamp"] == "20210109155628"
def test_check_filters():
filters = []
_check_filters(filters)
filters = ["statuscode:200", "timestamp:20215678901234", "original:https://url.com"]
_check_filters(filters)
with pytest.raises(WaybackError):
_check_filters("not-list")
def test_check_collapses():
collapses = []
_check_collapses(collapses)
collapses = ["timestamp:10"]
_check_collapses(collapses)
collapses = ["urlkey"]
_check_collapses(collapses)
collapses = "urlkey" # NOT LIST
with pytest.raises(WaybackError):
_check_collapses(collapses)
collapses = ["also illegal collapse"]
with pytest.raises(WaybackError):
_check_collapses(collapses)
def test_check_match_type():
assert _check_match_type(None, "url") is None
match_type = "exact"
url = "test_url"
assert _check_match_type(match_type, url) is None
url = "has * in it"
with pytest.raises(WaybackError):
_check_match_type("domain", url)
with pytest.raises(WaybackError):
_check_match_type("not a valid type", "url")
def test_cleaned_url():
test_url = " https://en.wikipedia.org/wiki/Network security "
answer = "https://en.wikipedia.org/wiki/Network%20security"
assert answer == _cleaned_url(test_url)
def test_url_check():
good_url = "https://akamhy.github.io"
assert _url_check(good_url) is None
bad_url = "https://github-com"
with pytest.raises(URLError):
_url_check(bad_url)
def test_full_url():
params = {}
endpoint = "https://web.archive.org/cdx/search/cdx"
assert endpoint == _full_url(endpoint, params)
params = {"a": "1"}
assert "https://web.archive.org/cdx/search/cdx?a=1" == _full_url(endpoint, params)
assert "https://web.archive.org/cdx/search/cdx?a=1" == _full_url(
endpoint + "?", params
)
params["b"] = 2
assert "https://web.archive.org/cdx/search/cdx?a=1&b=2" == _full_url(
endpoint + "?", params
)
params["c"] = "foo bar"
assert "https://web.archive.org/cdx/search/cdx?a=1&b=2&c=foo%20bar" == _full_url(
endpoint + "?", params
)
def test_get_total_pages():
user_agent = "Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko"
url = "github.com*"
assert 212890 <= _get_total_pages(url, user_agent)
url = "https://zenodo.org/record/4416138"
assert 2 >= _get_total_pages(url, user_agent)
def test_archive_url_parser():
perfect_header = """
{'Server': 'nginx/1.15.8', 'Date': 'Sat, 02 Jan 2021 09:40:25 GMT', 'Content-Type': 'text/html; charset=UTF-8', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'X-Archive-Orig-Server': 'nginx', 'X-Archive-Orig-Date': 'Sat, 02 Jan 2021 09:40:09 GMT', 'X-Archive-Orig-Transfer-Encoding': 'chunked', 'X-Archive-Orig-Connection': 'keep-alive', 'X-Archive-Orig-Vary': 'Accept-Encoding', 'X-Archive-Orig-Last-Modified': 'Fri, 01 Jan 2021 12:19:00 GMT', 'X-Archive-Orig-Strict-Transport-Security': 'max-age=31536000, max-age=0;', 'X-Archive-Guessed-Content-Type': 'text/html', 'X-Archive-Guessed-Charset': 'utf-8', 'Memento-Datetime': 'Sat, 02 Jan 2021 09:40:09 GMT', 'Link': '<https://www.scribbr.com/citing-sources/et-al/>; rel="original", <https://web.archive.org/web/timemap/link/https://www.scribbr.com/citing-sources/et-al/>; rel="timemap"; type="application/link-format", <https://web.archive.org/web/https://www.scribbr.com/citing-sources/et-al/>; rel="timegate", <https://web.archive.org/web/20200601082911/https://www.scribbr.com/citing-sources/et-al/>; rel="first memento"; datetime="Mon, 01 Jun 2020 08:29:11 GMT", <https://web.archive.org/web/20201126185327/https://www.scribbr.com/citing-sources/et-al/>; rel="prev memento"; datetime="Thu, 26 Nov 2020 18:53:27 GMT", <https://web.archive.org/web/20210102094009/https://www.scribbr.com/citing-sources/et-al/>; rel="memento"; datetime="Sat, 02 Jan 2021 09:40:09 GMT", <https://web.archive.org/web/20210102094009/https://www.scribbr.com/citing-sources/et-al/>; rel="last memento"; datetime="Sat, 02 Jan 2021 09:40:09 GMT"', 'Content-Security-Policy': "default-src 'self' 'unsafe-eval' 'unsafe-inline' data: blob: archive.org web.archive.org analytics.archive.org pragma.archivelab.org", 'X-Archive-Src': 'spn2-20210102092956-wwwb-spn20.us.archive.org-8001.warc.gz', 'Server-Timing': 'captures_list;dur=112.646325, exclusion.robots;dur=0.172010, exclusion.robots.policy;dur=0.158205, RedisCDXSource;dur=2.205932, esindex;dur=0.014647, LoadShardBlock;dur=82.205012, PetaboxLoader3.datanode;dur=70.750239, CDXLines.iter;dur=24.306278, load_resource;dur=26.520179', 'X-App-Server': 'wwwb-app200', 'X-ts': '200', 'X-location': 'All', 'X-Cache-Key': 'httpsweb.archive.org/web/20210102094009/https://www.scribbr.com/citing-sources/et-al/IN', 'X-RL': '0', 'X-Page-Cache': 'MISS', 'X-Archive-Screenname': '0', 'Content-Encoding': 'gzip'}
"""
archive = _archive_url_parser(
perfect_header, "https://www.scribbr.com/citing-sources/et-al/"
)
assert "web.archive.org/web/20210102094009" in archive
header = """
vhgvkjv
Content-Location: /web/20201126185327/https://www.scribbr.com/citing-sources/et-al
ghvjkbjmmcmhj
"""
archive = _archive_url_parser(
header, "https://www.scribbr.com/citing-sources/et-al/"
)
assert "20201126185327" in archive
header = """
hfjkfjfcjhmghmvjm
X-Cache-Key: https://web.archive.org/web/20171128185327/https://www.scribbr.com/citing-sources/et-al/US
yfu,u,gikgkikik
"""
archive = _archive_url_parser(
header, "https://www.scribbr.com/citing-sources/et-al/"
)
assert "20171128185327" in archive
# The below header should result in Exception
no_archive_header = """
{'Server': 'nginx/1.15.8', 'Date': 'Sat, 02 Jan 2021 09:42:45 GMT', 'Content-Type': 'text/html; charset=utf-8', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'Cache-Control': 'no-cache', 'X-App-Server': 'wwwb-app52', 'X-ts': '523', 'X-RL': '0', 'X-Page-Cache': 'MISS', 'X-Archive-Screenname': '0'}
"""
with pytest.raises(WaybackError):
_archive_url_parser(
no_archive_header, "https://www.scribbr.com/citing-sources/et-al/"
)
def test_wayback_timestamp():
ts = _wayback_timestamp(year=2020, month=1, day=2, hour=3, minute=4)
assert "202001020304" in str(ts)
def test_get_response():
endpoint = "https://www.google.com"
user_agent = (
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0"
)
headers = {"User-Agent": "%s" % user_agent}
response = _get_response(endpoint, params=None, headers=headers)
assert response.status_code == 200
endpoint = "http/wwhfhfvhvjhmom"
with pytest.raises(WaybackError):
_get_response(endpoint, params=None, headers=headers)
endpoint = "https://akamhy.github.io"
url, response = _get_response(
endpoint, params=None, headers=headers, return_full_url=True
)
assert endpoint == url

View File

@ -1,28 +0,0 @@
import pytest
from waybackpy.wrapper import Url
user_agent = "Mozilla/5.0 (Windows NT 6.2; rv:20.0) Gecko/20121202 Firefox/20.0"
def test_url_check():
"""No API Use"""
broken_url = "http://wwwgooglecom/"
with pytest.raises(Exception):
Url(broken_url, user_agent)
def test_near():
with pytest.raises(Exception):
NeverArchivedUrl = (
"https://ee_3n.wrihkeipef4edia.org/rwti5r_ki/Nertr6w_rork_rse7c_urity"
)
target = Url(NeverArchivedUrl, user_agent)
target.near(year=2010)
def test_json():
url = "github.com/akamhy/waybackpy"
target = Url(url, user_agent)
assert "archived_snapshots" in str(target.JSON)

View File

@ -1,50 +1,7 @@
# ┏┓┏┓┏┓━━━━━━━━━━┏━━┓━━━━━━━━━━┏┓━━┏━━━┓━━━━━ from .wrapper import Url
# ┃┃┃┃┃┃━━━━━━━━━━┃┏┓┃━━━━━━━━━━┃┃━━┃┏━┓┃━━━━━ from .cdx_api import WaybackMachineCDXServerAPI
# ┃┃┃┃┃┃┏━━┓━┏┓━┏┓┃┗┛┗┓┏━━┓━┏━━┓┃┃┏┓┃┗━┛┃┏┓━┏┓ from .save_api import WaybackMachineSaveAPI
# ┃┗┛┗┛┃┗━┓┃━┃┃━┃┃┃┏━┓┃┗━┓┃━┃┏━┛┃┗┛┛┃┏━━┛┃┃━┃┃ from .availability_api import WaybackMachineAvailabilityAPI
# ┗┓┏┓┏┛┃┗┛┗┓┃┗━┛┃┃┗━┛┃┃┗┛┗┓┃┗━┓┃┏┓┓┃┃━━━┃┗━┛┃
# ━┗┛┗┛━┗━━━┛┗━┓┏┛┗━━━┛┗━━━┛┗━━┛┗┛┗┛┗┛━━━┗━┓┏┛
# ━━━━━━━━━━━┏━┛┃━━━━━━━━━━━━━━━━━━━━━━━━┏━┛┃━
# ━━━━━━━━━━━┗━━┛━━━━━━━━━━━━━━━━━━━━━━━━┗━━┛━
"""
Waybackpy is a Python package & command-line program that interfaces with the Internet Archive's Wayback Machine API.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Archive webpage and retrieve archived URLs easily.
Usage:
>>> import waybackpy
>>> url = "https://en.wikipedia.org/wiki/Multivariable_calculus"
>>> user_agent = "Mozilla/5.0 (Windows NT 5.1; rv:40.0) Gecko/20100101 Firefox/40.0"
>>> wayback = waybackpy.Url(url, user_agent)
>>> archive = wayback.save()
>>> str(archive)
'https://web.archive.org/web/20210104173410/https://en.wikipedia.org/wiki/Multivariable_calculus'
>>> archive.timestamp
datetime.datetime(2021, 1, 4, 17, 35, 12, 691741)
>>> oldest_archive = wayback.oldest()
>>> str(oldest_archive)
'https://web.archive.org/web/20050422130129/http://en.wikipedia.org:80/wiki/Multivariable_calculus'
>>> archive_close_to_2010_feb = wayback.near(year=2010, month=2)
>>> str(archive_close_to_2010_feb)
'https://web.archive.org/web/20100215001541/http://en.wikipedia.org:80/wiki/Multivariable_calculus'
>>> str(wayback.newest())
'https://web.archive.org/web/20210104173410/https://en.wikipedia.org/wiki/Multivariable_calculus'
Full documentation @ <https://github.com/akamhy/waybackpy/wiki>.
:copyright: (c) 2020-2021 AKash Mahanty Et al.
:license: MIT
"""
from .wrapper import Url, Cdx
from .__version__ import ( from .__version__ import (
__title__, __title__,
__description__, __description__,

View File

@ -1,11 +1,11 @@
__title__ = "waybackpy" __title__ = "waybackpy"
__description__ = ( __description__ = (
"A Python package that interfaces with the Internet Archive's Wayback Machine API. " "Python package that interfaces with the Internet Archive's Wayback Machine APIs. "
"Archive pages and retrieve archived pages easily." "Archive pages and retrieve archived pages easily."
) )
__url__ = "https://akamhy.github.io/waybackpy/" __url__ = "https://akamhy.github.io/waybackpy/"
__version__ = "2.4.4" __version__ = "3.0.0"
__author__ = "akamhy" __author__ = "akamhy"
__author_email__ = "akamhy@yahoo.com" __author_email__ = "akamhy@yahoo.com"
__license__ = "MIT" __license__ = "MIT"
__copyright__ = "Copyright 2020-2021 Akash Mahanty et al." __copyright__ = "Copyright 2020-2022 Akash Mahanty et al."

View File

@ -0,0 +1,99 @@
import re
import time
import requests
from datetime import datetime
from .__version__ import __version__
from .utils import DEFAULT_USER_AGENT
def full_url(endpoint, params):
if not params:
return endpoint.strip()
full_url = endpoint if endpoint.endswith("?") else (endpoint + "?")
for key, val in params.items():
key = "filter" if key.startswith("filter") else key
key = "collapse" if key.startswith("collapse") else key
amp = "" if full_url.endswith("?") else "&"
full_url = (
full_url
+ amp
+ "{key}={val}".format(key=key, val=requests.utils.quote(str(val)))
)
return full_url
class WaybackMachineAvailabilityAPI:
def __init__(self, url, user_agent=DEFAULT_USER_AGENT):
self.url = str(url).strip().replace(" ", "%20")
self.user_agent = user_agent
self.headers = {"User-Agent": self.user_agent}
self.payload = {"url": "{url}".format(url=self.url)}
self.endpoint = "https://archive.org/wayback/available"
def unix_timestamp_to_wayback_timestamp(self, unix_timestamp):
return datetime.utcfromtimestamp(int(unix_timestamp)).strftime("%Y%m%d%H%M%S")
def json(self):
self.request_url = full_url(self.endpoint, self.payload)
self.response = requests.get(self.request_url, self.headers)
self.JSON = self.response.json()
return self.JSON
def timestamp(self):
if not self.JSON["archived_snapshots"] or not self.JSON:
return datetime.max
return datetime.strptime(
self.JSON["archived_snapshots"]["closest"]["timestamp"], "%Y%m%d%H%M%S"
)
@property
def archive_url(self):
data = self.JSON
if not data["archived_snapshots"]:
archive_url = None
else:
archive_url = data["archived_snapshots"]["closest"]["url"]
archive_url = archive_url.replace(
"http://web.archive.org/web/", "https://web.archive.org/web/", 1
)
return archive_url
def wayback_timestamp(self, **kwargs):
return "".join(
str(kwargs[key]).zfill(2)
for key in ["year", "month", "day", "hour", "minute"]
)
def oldest(self):
return self.near(year=1994)
def newest(self):
return self.near(unix_timestamp=int(time.time()))
def near(
self,
year=None,
month=None,
day=None,
hour=None,
minute=None,
unix_timestamp=None,
):
if unix_timestamp:
timestamp = self.unix_timestamp_to_wayback_timestamp(unix_timestamp)
else:
now = datetime.utcnow().timetuple()
timestamp = self.wayback_timestamp(
year=year if year else now.tm_year,
month=month if month else now.tm_mon,
day=day if day else now.tm_mday,
hour=hour if hour else now.tm_hour,
minute=minute if minute else now.tm_min,
)
self.payload["timestamp"] = timestamp
return self.json()

View File

@ -1,20 +1,17 @@
from .snapshot import CdxSnapshot
from .exceptions import WaybackError from .exceptions import WaybackError
from .utils import ( from .cdx_snapshot import CDXSnapshot
_get_total_pages, from .cdx_utils import (
_get_response, get_total_pages,
default_user_agent, get_response,
_check_filters, check_filters,
_check_collapses, check_collapses,
_check_match_type, check_match_type,
_add_payload,
) )
# TODO : Threading support for pagination API. It's designed for Threading. from .utils import DEFAULT_USER_AGENT
# TODO : Add get method here if type is Vaild HTML, SVG other but not - or warc. Test it.
class Cdx: class WaybackMachineCDXServerAPI:
def __init__( def __init__(
self, self,
url, url,
@ -27,87 +24,34 @@ class Cdx:
collapses=[], collapses=[],
limit=None, limit=None,
): ):
self.url = str(url).strip() self.url = str(url).strip().replace(" ", "%20")
self.user_agent = str(user_agent) if user_agent else default_user_agent self.user_agent = str(user_agent) if user_agent else DEFAULT_USER_AGENT
self.start_timestamp = str(start_timestamp) if start_timestamp else None self.start_timestamp = str(start_timestamp) if start_timestamp else None
self.end_timestamp = str(end_timestamp) if end_timestamp else None self.end_timestamp = str(end_timestamp) if end_timestamp else None
self.filters = filters self.filters = filters
_check_filters(self.filters) check_filters(self.filters)
self.match_type = str(match_type).strip() if match_type else None self.match_type = str(match_type).strip() if match_type else None
_check_match_type(self.match_type, self.url) check_match_type(self.match_type, self.url)
self.gzip = gzip if gzip else True self.gzip = gzip if gzip else True
self.collapses = collapses self.collapses = collapses
_check_collapses(self.collapses) check_collapses(self.collapses)
self.limit = limit if limit else 5000 self.limit = limit if limit else 5000
self.last_api_request_url = None self.last_api_request_url = None
self.use_page = False self.use_page = False
self.endpoint = "https://web.archive.org/cdx/search/cdx"
def cdx_api_manager(self, payload, headers, use_page=False): def cdx_api_manager(self, payload, headers, use_page=False):
"""Act as button, we can choose between the normal API and pagination API.
Parameters total_pages = get_total_pages(self.url, self.user_agent)
----------
self : waybackpy.cdx.Cdx
The instance itself
payload : dict
Get request parameters name value pairs
headers : dict
The headers for making the GET request.
use_page : bool
If True use pagination API else use normal resume key based API.
We have two options to get the snapshots, we use this
method to make a selection between pagination API and
the normal one with Resumption Key, sequential querying
of CDX data. For very large querying (for example domain query),
it may be useful to perform queries in parallel and also estimate
the total size of the query.
read more about the pagination API at:
https://web.archive.org/web/20201228063237/https://github.com/internetarchive/wayback/blob/master/wayback-cdx-server/README.md#pagination-api
if use_page is false if will use the normal sequential query API,
else use the pagination API.
two mutually exclusive cases possible:
1) pagination API is selected
a) get the total number of pages to read, using _get_total_pages()
b) then we use a for loop to get all the pages and yield the response text
2) normal sequential query API is selected.
a) get use showResumeKey=true to ask the API to add a query resumption key
at the bottom of response
b) check if the page has more than 3 lines, if not return the text
c) if it has atleast three lines, we check the second last line for zero length.
d) if the second last line has length zero than we assume that the last line contains
the resumption key, we set the resumeKey and remove the resumeKey from text
e) if the second line has non zero length we return the text as there will no resumption key
f) if we find the resumption key we set the "more" variable status to True which is always set
to False on each iteration. If more is not True the iteration stops and function returns.
"""
endpoint = "https://web.archive.org/cdx/search/cdx"
total_pages = _get_total_pages(self.url, self.user_agent)
# If we only have two or less pages of archives then we care for accuracy # If we only have two or less pages of archives then we care for accuracy
# pagination API can be lagged sometimes # pagination API can be lagged sometimes
if use_page == True and total_pages >= 2: if use_page == True and total_pages >= 2:
blank_pages = 0 blank_pages = 0
for i in range(total_pages): for i in range(total_pages):
payload["page"] = str(i) payload["page"] = str(i)
url, res = _get_response(
endpoint, params=payload, headers=headers, return_full_url=True url, res = get_response(
self.endpoint, params=payload, headers=headers, return_full_url=True
) )
self.last_api_request_url = url self.last_api_request_url = url
@ -131,8 +75,8 @@ class Cdx:
if resumeKey: if resumeKey:
payload["resumeKey"] = resumeKey payload["resumeKey"] = resumeKey
url, res = _get_response( url, res = get_response(
endpoint, params=payload, headers=headers, return_full_url=True self.endpoint, params=payload, headers=headers, return_full_url=True
) )
self.last_api_request_url = url self.last_api_request_url = url
@ -154,23 +98,35 @@ class Cdx:
yield text yield text
def add_payload(self, payload):
if self.start_timestamp:
payload["from"] = self.start_timestamp
if self.end_timestamp:
payload["to"] = self.end_timestamp
if self.gzip != True:
payload["gzip"] = "false"
if self.match_type:
payload["matchType"] = self.match_type
if self.filters and len(self.filters) > 0:
for i, f in enumerate(self.filters):
payload["filter" + str(i)] = f
if self.collapses and len(self.collapses) > 0:
for i, f in enumerate(self.collapses):
payload["collapse" + str(i)] = f
# Don't need to return anything as it's dictionary.
payload["url"] = self.url
def snapshots(self): def snapshots(self):
"""
This function yeilds snapshots encapsulated
in CdxSnapshot for increased usability.
All the get request values are set if the conditions match
And we use logic that if someone's only inputs don't have any
of [start_timestamp, end_timestamp] and don't use any collapses
then we use the pagination API as it returns archives starting
from the first archive and the recent most archive will be on
the last page.
"""
payload = {} payload = {}
headers = {"User-Agent": self.user_agent} headers = {"User-Agent": self.user_agent}
_add_payload(self, payload) self.add_payload(payload)
if not self.start_timestamp or self.end_timestamp: if not self.start_timestamp or self.end_timestamp:
self.use_page = True self.use_page = True
@ -226,4 +182,4 @@ class Cdx:
properties["length"], properties["length"],
) = prop_values ) = prop_values
yield CdxSnapshot(properties) yield CDXSnapshot(properties)

View File

@ -1,26 +1,8 @@
from datetime import datetime from datetime import datetime
class CdxSnapshot: class CDXSnapshot:
"""
This class encapsulates the snapshots for greater usability.
Raw Snapshot data looks like:
org,archive)/ 20080126045828 http://github.com text/html 200 Q4YULN754FHV2U6Q5JUT6Q2P57WEWNNY 1415
"""
def __init__(self, properties): def __init__(self, properties):
"""
Parameters
----------
self : waybackpy.snapshot.CdxSnapshot
The instance itself
properties : dict
Properties is a dict containg all of the 7 cdx snapshot properties.
"""
self.urlkey = properties["urlkey"] self.urlkey = properties["urlkey"]
self.timestamp = properties["timestamp"] self.timestamp = properties["timestamp"]
self.datetime_timestamp = datetime.strptime(self.timestamp, "%Y%m%d%H%M%S") self.datetime_timestamp = datetime.strptime(self.timestamp, "%Y%m%d%H%M%S")
@ -34,12 +16,6 @@ class CdxSnapshot:
) )
def __str__(self): def __str__(self):
"""Returns the Cdx snapshot line.
Output format:
org,archive)/ 20080126045828 http://github.com text/html 200 Q4YULN754FHV2U6Q5JUT6Q2P57WEWNNY 1415
"""
return "{urlkey} {timestamp} {original} {mimetype} {statuscode} {digest} {length}".format( return "{urlkey} {timestamp} {original} {mimetype} {statuscode} {digest} {length}".format(
urlkey=self.urlkey, urlkey=self.urlkey,
timestamp=self.timestamp, timestamp=self.timestamp,

154
waybackpy/cdx_utils.py Normal file
View File

@ -0,0 +1,154 @@
import re
import requests
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
from .exceptions import WaybackError
def get_total_pages(url, user_agent):
request_url = (
"https://web.archive.org/cdx/search/cdx?url={url}&showNumPages=true".format(
url=url
)
)
headers = {"User-Agent": user_agent}
return int((requests.get(request_url, headers=headers).text).strip())
def full_url(endpoint, params):
if not params:
return endpoint
full_url = endpoint if endpoint.endswith("?") else (endpoint + "?")
for key, val in params.items():
key = "filter" if key.startswith("filter") else key
key = "collapse" if key.startswith("collapse") else key
amp = "" if full_url.endswith("?") else "&"
full_url = (
full_url
+ amp
+ "{key}={val}".format(key=key, val=requests.utils.quote(str(val)))
)
return full_url
def get_response(
endpoint,
params=None,
headers=None,
return_full_url=False,
retries=5,
backoff_factor=0.5,
no_raise_on_redirects=False,
):
s = requests.Session()
retries = Retry(
total=retries,
backoff_factor=backoff_factor,
status_forcelist=[500, 502, 503, 504],
)
s.mount("https://", HTTPAdapter(max_retries=retries))
# The URL with parameters required for the get request
url = full_url(endpoint, params)
try:
if not return_full_url:
return s.get(url, headers=headers)
return (url, s.get(url, headers=headers))
except Exception as e:
reason = str(e)
if no_raise_on_redirects:
if "Exceeded 30 redirects" in reason:
return
exc_message = "Error while retrieving {url}.\n{reason}".format(
url=url, reason=reason
)
exc = WaybackError(exc_message)
exc.__cause__ = e
raise exc
def check_filters(filters):
if not isinstance(filters, list):
raise WaybackError("filters must be a list.")
# [!]field:regex
for _filter in filters:
try:
match = re.search(
r"(\!?(?:urlkey|timestamp|original|mimetype|statuscode|digest|length)):(.*)",
_filter,
)
key = match.group(1)
val = match.group(2)
except Exception:
exc_message = (
"Filter '{_filter}' is not following the cdx filter syntax.".format(
_filter=_filter
)
)
raise WaybackError(exc_message)
def check_collapses(collapses):
if not isinstance(collapses, list):
raise WaybackError("collapses must be a list.")
if len(collapses) == 0:
return
for collapse in collapses:
try:
match = re.search(
r"(urlkey|timestamp|original|mimetype|statuscode|digest|length)(:?[0-9]{1,99})?",
collapse,
)
field = match.group(1)
N = None
if 2 == len(match.groups()):
N = match.group(2)
if N:
if not (field + N == collapse):
raise Exception
else:
if not (field == collapse):
raise Exception
except Exception:
exc_message = "collapse argument '{collapse}' is not following the cdx collapse syntax.".format(
collapse=collapse
)
raise WaybackError(exc_message)
def check_match_type(match_type, url):
if not match_type:
return
if "*" in url:
raise WaybackError("Can not use wildcard with match_type argument")
legal_match_type = ["exact", "prefix", "host", "domain"]
if match_type not in legal_match_type:
exc_message = "{match_type} is not an allowed match type.\nUse one from 'exact', 'prefix', 'host' or 'domain'".format(
match_type=match_type
)
raise WaybackError(exc_message)

View File

@ -1,334 +1,270 @@
import os import click
import re import json as JSON
import sys
import json
import random
import string
import argparse
from .wrapper import Url
from .exceptions import WaybackError
from .__version__ import __version__ from .__version__ import __version__
from .utils import DEFAULT_USER_AGENT
from .cdx_api import WaybackMachineCDXServerAPI
from .save_api import WaybackMachineSaveAPI
from .availability_api import WaybackMachineAvailabilityAPI
def _save(obj): @click.command()
try: @click.option(
return obj.save() "-u", "--url", help="URL on which Wayback machine operations are to be performed."
except Exception as err:
e = str(err)
m = re.search(r"Header:\n(.*)", e)
if m:
header = m.group(1)
if "No archive URL found in the API response" in e:
return (
"\n[waybackpy] Can not save/archive your link.\n[waybackpy] This "
"could happen because either your waybackpy ({version}) is likely out of "
"date or Wayback Machine is malfunctioning.\n[waybackpy] Visit "
"https://github.com/akamhy/waybackpy for the latest version of "
"waybackpy.\n[waybackpy] API response Header :\n{header}".format(
version=__version__, header=header
) )
@click.option(
"-ua",
"--user-agent",
"--user_agent",
default=DEFAULT_USER_AGENT,
help="User agent, default user agent is '%s' " % DEFAULT_USER_AGENT,
) )
if "URL cannot be archived by wayback machine as it is a redirect" in e: @click.option(
return ("URL cannot be archived by wayback machine as it is a redirect") "-v", "--version", is_flag=True, default=False, help="Print waybackpy version."
raise WaybackError(err)
def _archive_url(obj):
return obj.archive_url
def _json(obj):
return json.dumps(obj.JSON)
def no_archive_handler(e, obj):
m = re.search(r"archive\sfor\s\'(.*?)\'\stry", str(e))
if m:
url = m.group(1)
ua = obj.user_agent
if "github.com/akamhy/waybackpy" in ua:
ua = "YOUR_USER_AGENT_HERE"
return (
"\n[Waybackpy] Can not find archive for '{url}'.\n[Waybackpy] You can"
" save the URL using the following command:\n[Waybackpy] waybackpy --"
'user_agent "{user_agent}" --url "{url}" --save'.format(
url=url, user_agent=ua
) )
@click.option(
"-n",
"--newest",
"-au",
"--archive_url",
"--archive-url",
default=False,
is_flag=True,
help="Fetch the newest archive of the specified URL",
) )
raise WaybackError(e) @click.option(
"-o",
"--oldest",
default=False,
is_flag=True,
help="Fetch the oldest archive of the specified URL",
)
@click.option(
"-j",
"--json",
default=False,
is_flag=True,
help="Spit out the JSON data for availability_api commands.",
)
@click.option(
"-N", "--near", default=False, is_flag=True, help="Archive near specified time."
)
@click.option("-Y", "--year", type=click.IntRange(1994, 9999), help="Year in integer.")
@click.option("-M", "--month", type=click.IntRange(1, 12), help="Month in integer.")
@click.option("-D", "--day", type=click.IntRange(1, 31), help="Day in integer.")
@click.option("-H", "--hour", type=click.IntRange(0, 24), help="Hour in integer.")
@click.option("-MIN", "--minute", type=click.IntRange(0, 60), help="Minute in integer.")
@click.option(
"-s",
"--save",
default=False,
is_flag=True,
help="Save the specified URL's webpage and print the archive URL.",
)
@click.option(
"-h",
"--headers",
default=False,
is_flag=True,
help="Spit out the headers data for save_api commands.",
)
@click.option(
"-c",
"--cdx",
default=False,
is_flag=True,
help="Spit out the headers data for save_api commands.",
)
@click.option(
"-st",
"--start-timestamp",
"--start_timestamp",
)
@click.option(
"-et",
"--end-timestamp",
"--end_timestamp",
)
@click.option(
"-f",
"--filters",
multiple=True,
)
@click.option(
"-mt",
"--match-type",
"--match_type",
)
@click.option(
"-gz",
"--gzip",
)
@click.option(
"-c",
"--collapses",
multiple=True,
)
@click.option(
"-l",
"--limit",
)
@click.option(
"-cp",
"--cdx-print",
"--cdx_print",
multiple=True,
)
def main(
url,
user_agent,
version,
newest,
oldest,
json,
near,
year,
month,
day,
hour,
minute,
save,
headers,
cdx,
start_timestamp,
end_timestamp,
filters,
match_type,
gzip,
collapses,
limit,
cdx_print,
):
"""
waybackpy : Python package & CLI tool that interfaces the Wayback Machine API
def _oldest(obj): Released under the MIT License.
try: License @ https://github.com/akamhy/waybackpy/blob/master/LICENSE
return obj.oldest()
except Exception as e:
return no_archive_handler(e, obj)
Copyright (c) 2020 waybackpy contributors. Contributors list @
https://github.com/akamhy/waybackpy/graphs/contributors
def _newest(obj): https://github.com/akamhy/waybackpy
try:
return obj.newest()
except Exception as e:
return no_archive_handler(e, obj)
https://pypi.org/project/waybackpy
def _total_archives(obj): """
return obj.total_archives()
if version:
click.echo("waybackpy version %s" % __version__)
return
def _near(obj, args): if not url:
_near_args = {} click.echo("No URL detected. Please pass an URL.")
args_arr = [args.year, args.month, args.day, args.hour, args.minute] return
def echo_availability_api(availability_api_instance):
click.echo("Archive URL:")
if not availability_api_instance.archive_url:
archive_url = (
"NO ARCHIVE FOUND - The requested URL is probably "
+ "not yet archived or if the URL was recently archived then it is "
+ "not yet available via the Wayback Machine's availability API "
+ "because of database lag and should be available after some time."
)
else:
archive_url = availability_api_instance.archive_url
click.echo(archive_url)
if json:
click.echo("JSON response:")
click.echo(JSON.dumps(availability_api_instance.JSON))
availability_api = WaybackMachineAvailabilityAPI(url, user_agent=user_agent)
if oldest:
availability_api.oldest()
echo_availability_api(availability_api)
return
if newest:
availability_api.newest()
echo_availability_api(availability_api)
return
if near:
near_args = {}
keys = ["year", "month", "day", "hour", "minute"] keys = ["year", "month", "day", "hour", "minute"]
args_arr = [year, month, day, hour, minute]
for key, arg in zip(keys, args_arr): for key, arg in zip(keys, args_arr):
if arg: if arg:
_near_args[key] = arg near_args[key] = arg
availability_api.near(**near_args)
echo_availability_api(availability_api)
return
try: if save:
return obj.near(**_near_args) save_api = WaybackMachineSaveAPI(url, user_agent=user_agent)
except Exception as e: save_api.save()
return no_archive_handler(e, obj) click.echo("Archive URL:")
click.echo(save_api.archive_url)
click.echo("Cached save:")
click.echo(save_api.cached_save)
if headers:
click.echo("Save API headers:")
click.echo(save_api.headers)
return
if cdx:
filters = list(filters)
collapses = list(collapses)
cdx_print = list(cdx_print)
def _save_urls_on_file(url_gen): cdx_api = WaybackMachineCDXServerAPI(
domain = None url,
sys_random = random.SystemRandom() user_agent=user_agent,
uid = "".join( start_timestamp=start_timestamp,
sys_random.choice(string.ascii_lowercase + string.digits) for _ in range(6) end_timestamp=end_timestamp,
filters=filters,
match_type=match_type,
gzip=gzip,
collapses=collapses,
limit=limit,
) )
url_count = 0
for url in url_gen: snapshots = cdx_api.snapshots()
url_count += 1
if not domain:
m = re.search("https?://([A-Za-z_0-9.-]+).*", url)
domain = "domain-unknown" for snapshot in snapshots:
if len(cdx_print) == 0:
if m: click.echo(snapshot)
domain = m.group(1)
file_name = "{domain}-urls-{uid}.txt".format(domain=domain, uid=uid)
file_path = os.path.join(os.getcwd(), file_name)
if not os.path.isfile(file_path):
open(file_path, "w+").close()
with open(file_path, "a") as f:
f.write("{url}\n".format(url=url))
print(url)
if url_count > 0:
return "\n\n'{file_name}' saved in current working directory".format(
file_name=file_name
)
else: else:
return "No known URLs found. Please try a diffrent input!" output_string = ""
if "urlkey" or "url-key" or "url_key" in cdx_print:
output_string = output_string + snapshot.urlkey + " "
def _known_urls(obj, args): if "timestamp" or "time-stamp" or "time_stamp" in cdx_print:
""" output_string = output_string + snapshot.timestamp + " "
Known urls for a domain. if "original" in cdx_print:
""" output_string = output_string + snapshot.original + " "
if "original" in cdx_print:
subdomain = True if args.subdomain else False output_string = output_string + snapshot.original + " "
if "mimetype" or "mime-type" or "mime_type" in cdx_print:
url_gen = obj.known_urls(subdomain=subdomain) output_string = output_string + snapshot.mimetype + " "
if "statuscode" or "status-code" or "status_code" in cdx_print:
if args.file: output_string = output_string + snapshot.statuscode + " "
return _save_urls_on_file(url_gen) if "digest" in cdx_print:
else: output_string = output_string + snapshot.digest + " "
for url in url_gen: if "length" in cdx_print:
print(url) output_string = output_string + snapshot.length + " "
return "\n" if "archiveurl" or "archive-url" or "archive_url" in cdx_print:
output_string = output_string + snapshot.archive_url + " "
click.echo(output_string)
def _get(obj, args):
if args.get.lower() == "url":
return obj.get()
if args.get.lower() == "archive_url":
return obj.get(obj.archive_url)
if args.get.lower() == "oldest":
return obj.get(obj.oldest())
if args.get.lower() == "latest" or args.get.lower() == "newest":
return obj.get(obj.newest())
if args.get.lower() == "save":
return obj.get(obj.save())
return "Use get as \"--get 'source'\", 'source' can be one of the followings: \
\n1) url - get the source code of the url specified using --url/-u.\
\n2) archive_url - get the source code of the newest archive for the supplied url, alias of newest.\
\n3) oldest - get the source code of the oldest archive for the supplied url.\
\n4) newest - get the source code of the newest archive for the supplied url.\
\n5) save - Create a new archive and get the source code of this new archive for the supplied url."
def args_handler(args):
if args.version:
return "waybackpy version {version}".format(version=__version__)
if not args.url:
return "waybackpy {version} \nSee 'waybackpy --help' for help using this tool.".format(
version=__version__
)
obj = Url(args.url)
if args.user_agent:
obj = Url(args.url, args.user_agent)
if args.save:
output = _save(obj)
elif args.archive_url:
output = _archive_url(obj)
elif args.json:
output = _json(obj)
elif args.oldest:
output = _oldest(obj)
elif args.newest:
output = _newest(obj)
elif args.known_urls:
output = _known_urls(obj, args)
elif args.total:
output = _total_archives(obj)
elif args.near:
return _near(obj, args)
elif args.get:
output = _get(obj, args)
else:
output = (
"You only specified the URL. But you also need to specify the operation."
"\nSee 'waybackpy --help' for help using this tool."
)
return output
def add_requiredArgs(requiredArgs):
requiredArgs.add_argument(
"--url", "-u", help="URL on which Wayback machine operations would occur"
)
def add_userAgentArg(userAgentArg):
help_text = 'User agent, default user_agent is "waybackpy python package - https://github.com/akamhy/waybackpy"'
userAgentArg.add_argument("--user_agent", "-ua", help=help_text)
def add_saveArg(saveArg):
saveArg.add_argument(
"--save", "-s", action="store_true", help="Save the URL on the Wayback machine"
)
def add_auArg(auArg):
auArg.add_argument(
"--archive_url",
"-au",
action="store_true",
help="Get the latest archive URL, alias for --newest",
)
def add_jsonArg(jsonArg):
jsonArg.add_argument(
"--json",
"-j",
action="store_true",
help="JSON data of the availability API request",
)
def add_oldestArg(oldestArg):
oldestArg.add_argument(
"--oldest",
"-o",
action="store_true",
help="Oldest archive for the specified URL",
)
def add_newestArg(newestArg):
newestArg.add_argument(
"--newest",
"-n",
action="store_true",
help="Newest archive for the specified URL",
)
def add_totalArg(totalArg):
totalArg.add_argument(
"--total",
"-t",
action="store_true",
help="Total number of archives for the specified URL",
)
def add_getArg(getArg):
getArg.add_argument(
"--get",
"-g",
help="Prints the source code of the supplied url. Use '--get help' for extended usage",
)
def add_knownUrlArg(knownUrlArg):
knownUrlArg.add_argument(
"--known_urls", "-ku", action="store_true", help="URLs known for the domain."
)
help_text = "Use with '--known_urls' to include known URLs for subdomains."
knownUrlArg.add_argument("--subdomain", "-sub", action="store_true", help=help_text)
knownUrlArg.add_argument(
"--file",
"-f",
action="store_true",
help="Save the URLs in file at current directory.",
)
def add_nearArg(nearArg):
nearArg.add_argument(
"--near", "-N", action="store_true", help="Archive near specified time"
)
def add_nearArgs(nearArgs):
nearArgs.add_argument("--year", "-Y", type=int, help="Year in integer")
nearArgs.add_argument("--month", "-M", type=int, help="Month in integer")
nearArgs.add_argument("--day", "-D", type=int, help="Day in integer.")
nearArgs.add_argument("--hour", "-H", type=int, help="Hour in intege")
nearArgs.add_argument("--minute", "-MIN", type=int, help="Minute in integer")
def parse_args(argv):
parser = argparse.ArgumentParser()
add_requiredArgs(parser.add_argument_group("URL argument (required)"))
add_userAgentArg(parser.add_argument_group("User Agent"))
add_saveArg(parser.add_argument_group("Create new archive/save URL"))
add_auArg(parser.add_argument_group("Get the latest Archive"))
add_jsonArg(parser.add_argument_group("Get the JSON data"))
add_oldestArg(parser.add_argument_group("Oldest archive"))
add_newestArg(parser.add_argument_group("Newest archive"))
add_totalArg(parser.add_argument_group("Total number of archives"))
add_getArg(parser.add_argument_group("Get source code"))
add_knownUrlArg(
parser.add_argument_group(
"URLs known and archived to Wayback Machine for the site."
)
)
add_nearArg(parser.add_argument_group("Archive close to time specified"))
add_nearArgs(parser.add_argument_group("Arguments that are used only with --near"))
parser.add_argument(
"--version", "-v", action="store_true", help="Waybackpy version"
)
return parser.parse_args(argv[1:])
def main(argv=None):
argv = sys.argv if argv is None else argv
print(args_handler(parse_args(argv)))
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(main(sys.argv)) main()

View File

@ -24,3 +24,15 @@ class URLError(Exception):
""" """
Raised when malformed URLs are passed as arguments. Raised when malformed URLs are passed as arguments.
""" """
class MaximumRetriesExceeded(WaybackError):
"""
MaximumRetriesExceeded
"""
class MaximumSaveRetriesExceeded(MaximumRetriesExceeded):
"""
MaximumSaveRetriesExceeded
"""

131
waybackpy/save_api.py Normal file
View File

@ -0,0 +1,131 @@
import re
import time
import requests
from datetime import datetime
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
from .utils import DEFAULT_USER_AGENT
from .exceptions import MaximumSaveRetriesExceeded
class WaybackMachineSaveAPI:
"""
WaybackMachineSaveAPI class provides an interface for saving URLs on the
Wayback Machine.
"""
def __init__(self, url, user_agent=DEFAULT_USER_AGENT, max_tries=8):
self.url = str(url).strip().replace(" ", "%20")
self.request_url = "https://web.archive.org/save/" + self.url
self.user_agent = user_agent
self.request_headers = {"User-Agent": self.user_agent}
self.max_tries = max_tries
self.total_save_retries = 5
self.backoff_factor = 0.5
self.status_forcelist = [500, 502, 503, 504]
self._archive_url = None
self.instance_birth_time = datetime.utcnow()
@property
def archive_url(self):
if self._archive_url:
return self._archive_url
else:
return self.save()
def get_save_request_headers(self):
session = requests.Session()
retries = Retry(
total=self.total_save_retries,
backoff_factor=self.backoff_factor,
status_forcelist=self.status_forcelist,
)
session.mount("https://", HTTPAdapter(max_retries=retries))
self.response = session.get(self.request_url, headers=self.request_headers)
self.headers = self.response.headers
self.status_code = self.response.status_code
self.response_url = self.response.url
def archive_url_parser(self):
regex1 = r"Content-Location: (/web/[0-9]{14}/.*)"
match = re.search(regex1, str(self.headers))
if match:
return "https://web.archive.org" + match.group(1)
regex2 = r"rel=\"memento.*?(web\.archive\.org/web/[0-9]{14}/.*?)>"
match = re.search(regex2, str(self.headers))
if match:
return "https://" + match.group(1)
regex3 = r"X-Cache-Key:\shttps(.*)[A-Z]{2}"
match = re.search(regex3, str(self.headers))
if match:
return "https://" + match.group(1)
if self.response_url:
self.response_url = self.response_url.strip()
if "web.archive.org/web" in self.response_url:
regex = r"web\.archive\.org/web/(?:[0-9]*?)/(?:.*)$"
match = re.search(regex, self.response_url)
if match:
return "https://" + match.group(0)
def sleep(self, tries):
sleep_seconds = 5
if tries % 3 == 0:
sleep_seconds = 10
time.sleep(sleep_seconds)
def timestamp(self):
m = re.search(
r"https?://web.archive.org/web/([0-9]{14})/http", self._archive_url
)
string_timestamp = m.group(1)
timestamp = datetime.strptime(string_timestamp, "%Y%m%d%H%M%S")
timestamp_unixtime = time.mktime(timestamp.timetuple())
instance_birth_time_unixtime = time.mktime(self.instance_birth_time.timetuple())
if timestamp_unixtime < instance_birth_time_unixtime:
self.cached_save = True
else:
self.cached_save = False
return timestamp
def save(self):
saved_archive = None
tries = 0
while True:
tries += 1
if tries >= self.max_tries:
raise MaximumSaveRetriesExceeded(
"Tried %s times but failed to save and return the archive for %s.\nResponse URL:\n%s \nResponse Header:\n%s\n"
% (str(tries), self.url, self.response_url, str(self.headers)),
)
if not saved_archive:
if tries > 1:
self.sleep(tries)
self.get_save_request_headers()
saved_archive = self.archive_url_parser()
if not saved_archive:
continue
else:
self._archive_url = saved_archive
self.timestamp()
return saved_archive

View File

@ -1,564 +1,11 @@
import re
import time
import requests import requests
from datetime import datetime
from .exceptions import WaybackError, URLError, RedirectSaveError
from .__version__ import __version__ from .__version__ import __version__
from urllib3.util.retry import Retry DEFAULT_USER_AGENT = "waybackpy %s - https://github.com/akamhy/waybackpy" % __version__
from requests.adapters import HTTPAdapter
quote = requests.utils.quote
default_user_agent = "waybackpy python package - https://github.com/akamhy/waybackpy"
def _latest_version(package_name, headers): def latest_version(package_name, headers):
"""Returns the latest version of package_name.
Parameters
----------
package_name : str
The name of the python package
headers : dict
Headers that will be used while making get requests
Return type is str
Use API <https://pypi.org/pypi/> to get the latest version of
waybackpy, but can be used to get latest version of any package
on PyPi.
"""
request_url = "https://pypi.org/pypi/" + package_name + "/json" request_url = "https://pypi.org/pypi/" + package_name + "/json"
response = _get_response(request_url, headers=headers) response = requests.get(request_url, headers=headers)
data = response.json() data = response.json()
return data["info"]["version"] return data["info"]["version"]
def _unix_timestamp_to_wayback_timestamp(unix_timestamp):
"""Returns unix timestamp converted to datetime.datetime
Parameters
----------
unix_timestamp : str, int or float
Unix-timestamp that needs to be converted to datetime.datetime
Converts and returns input unix_timestamp to datetime.datetime object.
Does not matter if unix_timestamp is str, float or int.
"""
return datetime.utcfromtimestamp(int(unix_timestamp)).strftime("%Y%m%d%H%M%S")
def _add_payload(instance, payload):
"""Adds payload from instance that can be used to make get requests.
Parameters
----------
instance : waybackpy.cdx.Cdx
instance of the Cdx class
payload : dict
A dict onto which we need to add keys and values based on instance.
instance is object of Cdx class and it contains the data required to fill
the payload dictionary.
"""
if instance.start_timestamp:
payload["from"] = instance.start_timestamp
if instance.end_timestamp:
payload["to"] = instance.end_timestamp
if instance.gzip != True:
payload["gzip"] = "false"
if instance.match_type:
payload["matchType"] = instance.match_type
if instance.filters and len(instance.filters) > 0:
for i, f in enumerate(instance.filters):
payload["filter" + str(i)] = f
if instance.collapses and len(instance.collapses) > 0:
for i, f in enumerate(instance.collapses):
payload["collapse" + str(i)] = f
# Don't need to return anything as it's dictionary.
payload["url"] = instance.url
def _timestamp_manager(timestamp, data):
"""Returns the timestamp.
Parameters
----------
timestamp : datetime.datetime
datetime object
data : dict
A python dictionary, which is loaded JSON os the availability API.
Return type:
datetime.datetime
If timestamp is not None then sets the value to timestamp itself.
If timestamp is None the returns the value from the last fetched API data.
If not timestamp and can not read the archived_snapshots form data return datetime.max
"""
if timestamp:
return timestamp
if not data["archived_snapshots"]:
return datetime.max
return datetime.strptime(
data["archived_snapshots"]["closest"]["timestamp"], "%Y%m%d%H%M%S"
)
def _check_match_type(match_type, url):
"""Checks the validity of match_type parameter of the CDX GET requests.
Parameters
----------
match_type : list
list that may contain any or all from ["exact", "prefix", "host", "domain"]
See https://github.com/akamhy/waybackpy/wiki/Python-package-docs#url-match-scope
url : str
The URL used to create the waybackpy Url object.
If not vaild match_type raise Exception.
"""
if not match_type:
return
if "*" in url:
raise WaybackError("Can not use wildcard with match_type argument")
legal_match_type = ["exact", "prefix", "host", "domain"]
if match_type not in legal_match_type:
exc_message = "{match_type} is not an allowed match type.\nUse one from 'exact', 'prefix', 'host' or 'domain'".format(
match_type=match_type
)
raise WaybackError(exc_message)
def _check_collapses(collapses):
"""Checks the validity of collapse parameter of the CDX GET request.
One or more field or field:N to 'collapses=[]' where
field is one of (urlkey, timestamp, original, mimetype, statuscode,
digest and length) and N is the first N characters of field to test.
Parameters
----------
collapses : list
If not vaild collapses raise Exception.
"""
if not isinstance(collapses, list):
raise WaybackError("collapses must be a list.")
if len(collapses) == 0:
return
for collapse in collapses:
try:
match = re.search(
r"(urlkey|timestamp|original|mimetype|statuscode|digest|length)(:?[0-9]{1,99})?",
collapse,
)
field = match.group(1)
N = None
if 2 == len(match.groups()):
N = match.group(2)
if N:
if not (field + N == collapse):
raise Exception
else:
if not (field == collapse):
raise Exception
except Exception:
exc_message = "collapse argument '{collapse}' is not following the cdx collapse syntax.".format(
collapse=collapse
)
raise WaybackError(exc_message)
def _check_filters(filters):
"""Checks the validity of filter parameter of the CDX GET request.
Any number of filter params of the following form may be specified:
filters=["[!]field:regex"] may be specified..
Parameters
----------
filters : list
If not vaild filters raise Exception.
"""
if not isinstance(filters, list):
raise WaybackError("filters must be a list.")
# [!]field:regex
for _filter in filters:
try:
match = re.search(
r"(\!?(?:urlkey|timestamp|original|mimetype|statuscode|digest|length)):(.*)",
_filter,
)
key = match.group(1)
val = match.group(2)
except Exception:
exc_message = (
"Filter '{_filter}' is not following the cdx filter syntax.".format(
_filter=_filter
)
)
raise WaybackError(exc_message)
def _cleaned_url(url):
"""Sanatize the url
Remove and replace illegal whitespace characters from the URL.
"""
return str(url).strip().replace(" ", "%20")
def _url_check(url):
"""
Check for common URL problems.
What we are checking:
1) '.' in self.url, no url that ain't '.' in it.
If you known any others, please create a PR on the github repo.
"""
if "." not in url:
exc_message = "'{url}' is not a vaild URL.".format(url=url)
raise URLError(exc_message)
def _full_url(endpoint, params):
"""API endpoint + GET parameters = full_url
Parameters
----------
endpoint : str
The API endpoint
params : dict
Dictionary that has name-value pairs.
Return type is str
"""
if not params:
return endpoint
full_url = endpoint if endpoint.endswith("?") else (endpoint + "?")
for key, val in params.items():
key = "filter" if key.startswith("filter") else key
key = "collapse" if key.startswith("collapse") else key
amp = "" if full_url.endswith("?") else "&"
full_url = full_url + amp + "{key}={val}".format(key=key, val=quote(str(val)))
return full_url
def _get_total_pages(url, user_agent):
"""
If showNumPages is passed in cdx API, it returns
'number of archive pages'and each page has many archives.
This func returns number of pages of archives (type int).
"""
total_pages_url = (
"https://web.archive.org/cdx/search/cdx?url={url}&showNumPages=true".format(
url=url
)
)
headers = {"User-Agent": user_agent}
return int((_get_response(total_pages_url, headers=headers).text).strip())
def _archive_url_parser(
header, url, latest_version=__version__, instance=None, response=None
):
"""Returns the archive after parsing it from the response header.
Parameters
----------
header : str
The response header of WayBack Machine's Save API
url : str
The input url, the one used to created the Url object.
latest_version : str
The latest version of waybackpy (default is __version__)
instance : waybackpy.wrapper.Url
Instance of Url class
The wayback machine's save API doesn't
return JSON response, we are required
to read the header of the API response
and find the archive URL.
This method has some regular expressions
that are used to search for the archive url
in the response header of Save API.
Two cases are possible:
1) Either we find the archive url in
the header.
2) Or we didn't find the archive url in
API header.
If we found the archive URL we return it.
Return format:
web.archive.org/web/<TIMESTAMP>/<URL>
And if we couldn't find it, we raise
WaybackError with an error message.
"""
if "save redirected" in header and instance:
time.sleep(60) # makeup for archive time
now = datetime.utcnow().timetuple()
timestamp = _wayback_timestamp(
year=now.tm_year,
month=now.tm_mon,
day=now.tm_mday,
hour=now.tm_hour,
minute=now.tm_min,
)
return_str = "web.archive.org/web/{timestamp}/{url}".format(
timestamp=timestamp, url=url
)
url = "https://" + return_str
headers = {"User-Agent": instance.user_agent}
res = _get_response(url, headers=headers)
if res.status_code < 400:
return "web.archive.org/web/{timestamp}/{url}".format(
timestamp=timestamp, url=url
)
# Regex1
m = re.search(r"Content-Location: (/web/[0-9]{14}/.*)", str(header))
if m:
return "web.archive.org" + m.group(1)
# Regex2
m = re.search(
r"rel=\"memento.*?(web\.archive\.org/web/[0-9]{14}/.*?)>", str(header)
)
if m:
return m.group(1)
# Regex3
m = re.search(r"X-Cache-Key:\shttps(.*)[A-Z]{2}", str(header))
if m:
return m.group(1)
if response:
if response.url:
if "web.archive.org/web" in response.url:
m = re.search(
r"web\.archive\.org/web/(?:[0-9]*?)/(?:.*)$",
str(response.url).strip(),
)
if m:
return m.group(0)
if instance:
newest_archive = None
try:
newest_archive = instance.newest()
except WaybackError:
pass # We don't care as this is a save request
if newest_archive:
minutes_old = (
datetime.utcnow() - newest_archive.timestamp
).total_seconds() / 60.0
if minutes_old <= 30:
archive_url = newest_archive.archive_url
m = re.search(r"web\.archive\.org/web/[0-9]{14}/.*", archive_url)
if m:
instance.cached_save = True
return m.group(0)
if __version__ == latest_version:
exc_message = (
"No archive URL found in the API response. "
"If '{url}' can be accessed via your web browser then either "
"Wayback Machine is malfunctioning or it refused to archive your URL."
"\nHeader:\n{header}".format(url=url, header=header)
)
if "save redirected" == header.strip():
raise RedirectSaveError(
"URL cannot be archived by wayback machine as it is a redirect.\nHeader:\n{header}".format(
header=header
)
)
else:
exc_message = (
"No archive URL found in the API response. "
"If '{url}' can be accessed via your web browser then either "
"this version of waybackpy ({version}) is out of date or WayBack "
"Machine is malfunctioning. Visit 'https://github.com/akamhy/waybackpy' "
"for the latest version of waybackpy.\nHeader:\n{header}".format(
url=url, version=__version__, header=header
)
)
raise WaybackError(exc_message)
def _wayback_timestamp(**kwargs):
"""Returns a valid waybackpy timestamp.
The standard archive URL format is
https://web.archive.org/web/20191214041711/https://www.youtube.com
If we break it down in three parts:
1 ) The start (https://web.archive.org/web/)
2 ) timestamp (20191214041711)
3 ) https://www.youtube.com, the original URL
The near method of Url class in wrapper.py takes year, month, day, hour
and minute as arguments, their type is int.
This method takes those integers and converts it to
wayback machine timestamp and returns it.
zfill(2) adds 1 zero in front of single digit days, months hour etc.
Return type is string.
"""
return "".join(
str(kwargs[key]).zfill(2) for key in ["year", "month", "day", "hour", "minute"]
)
def _get_response(
endpoint,
params=None,
headers=None,
return_full_url=False,
retries=5,
backoff_factor=0.5,
no_raise_on_redirects=False,
):
"""Makes get requests.
Parameters
----------
endpoint : str
The API endpoint.
params : dict
The get request parameters. (default is None)
headers : dict
Headers for the get request. (default is None)
return_full_url : bool
Determines whether the call went full url returned along with the
response. (default is False)
retries : int
Maximum number of retries for the get request. (default is 5)
backoff_factor : float
The factor by which we determine the next retry time after wait.
https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html
(default is 0.5)
no_raise_on_redirects : bool
If maximum 30(default for requests) times redirected than instead of
exceptions return. (default is False)
To handle WaybackError:
from waybackpy.exceptions import WaybackError
try:
...
except WaybackError as e:
# handle it
"""
# From https://stackoverflow.com/a/35504626
# By https://stackoverflow.com/users/401467/datashaman
s = requests.Session()
retries = Retry(
total=retries,
backoff_factor=backoff_factor,
status_forcelist=[500, 502, 503, 504],
)
s.mount("https://", HTTPAdapter(max_retries=retries))
# The URL with parameters required for the get request
url = _full_url(endpoint, params)
try:
if not return_full_url:
return s.get(url, headers=headers)
return (url, s.get(url, headers=headers))
except Exception as e:
reason = str(e)
if no_raise_on_redirects:
if "Exceeded 30 redirects" in reason:
return
exc_message = "Error while retrieving {url}.\n{reason}".format(
url=url, reason=reason
)
exc = WaybackError(exc_message)
exc.__cause__ = e
raise exc

View File

@ -1,298 +1,27 @@
import re from .save_api import WaybackMachineSaveAPI
from datetime import datetime, timedelta from .availability_api import WaybackMachineAvailabilityAPI
from .cdx_api import WaybackMachineCDXServerAPI
from .utils import DEFAULT_USER_AGENT
from .exceptions import WaybackError from .exceptions import WaybackError
from .cdx import Cdx
from .utils import (
_archive_url_parser,
_wayback_timestamp,
_get_response,
default_user_agent,
_url_check,
_cleaned_url,
_timestamp_manager,
_unix_timestamp_to_wayback_timestamp,
_latest_version,
)
class Url: class Url:
""" def __init__(self, url, user_agent=DEFAULT_USER_AGENT):
Attributes
----------
url : str
The input URL, wayback machine API operations are performed
on this URL after sanatizing it.
user_agent : str
The user_agent used while making the GET requests to the
Wayback machine APIs
_archive_url : str
Caches the last fetched archive.
timestamp : datetime.datetime
timestamp of the archive URL as datetime object for
greater usability
_JSON : dict
Caches the last fetched availability API data
latest_version : str
The latest version of waybackpy on PyPi
cached_save : bool
Flag to check if WayBack machine returned a cached
archive instead of creating a new archive. WayBack
machine allows only one 1 archive for an URL in
30 minutes. If the archive returned by WayBack machine
is older than 3 minutes than this flag is set to True
Methods turned properties
----------
JSON : dict
JSON response of availability API as dictionary / loaded JSON
archive_url : str
Return the archive url, returns str
_timestamp : datetime.datetime
Sets the value of self.timestamp if still not set
Methods
-------
save()
Archives the URL on WayBack machine
get(url="", user_agent="", encoding="")
Gets the source of archive url, can also be used to get source
of any URL if passed into it.
near(year=None, month=None, day=None, hour=None, minute=None, unix_timestamp=None)
Wayback Machine can have many archives for a URL/webpage, sometimes we want
archive close to a specific time.
This method takes year, month, day, hour, minute and unix_timestamp as input.
oldest(year=1994)
The oldest archive of an URL.
newest()
The newest archive of an URL
total_archives(start_timestamp=None, end_timestamp=None)
total number of archives of an URL, the timeframe can be confined by
start_timestamp and end_timestamp
known_urls(subdomain=False, host=False, start_timestamp=None, end_timestamp=None, match_type="prefix")
Known URLs for an URL, subdomain, URL as prefix etc.
"""
def __init__(self, url, user_agent=default_user_agent):
self.url = url self.url = url
self.user_agent = str(user_agent) self.user_agent = str(user_agent)
_url_check(self.url) self.wayback_machine_availability_api = WaybackMachineAvailabilityAPI(
self._archive_url = None self.url, user_agent=self.user_agent
self.timestamp = None
self._JSON = None
self.latest_version = None
self.cached_save = False
def __repr__(self):
return "waybackpy.Url(url={url}, user_agent={user_agent})".format(
url=self.url, user_agent=self.user_agent
) )
def __str__(self):
if not self._archive_url:
self._archive_url = self.archive_url
return "{archive_url}".format(archive_url=self._archive_url)
def __len__(self):
"""Number of days between today and the date of archive based on the timestamp
len() of waybackpy.wrapper.Url should return
the number of days between today and the
archive timestamp.
Can be applied on return values of near and its
childs (e.g. oldest) and if applied on waybackpy.Url()
whithout using any functions, it just grabs
self._timestamp and def _timestamp gets it
from def JSON.
"""
td_max = timedelta(
days=999999999, hours=23, minutes=59, seconds=59, microseconds=999999
)
if not self.timestamp:
self.timestamp = self._timestamp
if self.timestamp == datetime.max:
return td_max.days
return (datetime.utcnow() - self.timestamp).days
@property
def JSON(self):
"""Returns JSON response of availability API as dictionary / loaded JSON
return type : dict
"""
# If user used the near method or any method that depends on near, we
# are certain that we have a loaded dictionary cached in self._JSON.
# Return the loaded JSON data.
if self._JSON:
return self._JSON
# If no cached data found, get data and return + cache it.
endpoint = "https://archive.org/wayback/available"
headers = {"User-Agent": self.user_agent}
payload = {"url": "{url}".format(url=_cleaned_url(self.url))}
response = _get_response(endpoint, params=payload, headers=headers)
self._JSON = response.json()
return self._JSON
@property
def archive_url(self):
"""Return the archive url.
return type : str
"""
if self._archive_url:
return self._archive_url
data = self.JSON
if not data["archived_snapshots"]:
archive_url = None
else:
archive_url = data["archived_snapshots"]["closest"]["url"]
archive_url = archive_url.replace(
"http://web.archive.org/web/", "https://web.archive.org/web/", 1
)
self._archive_url = archive_url
return archive_url
@property
def _timestamp(self):
"""Sets the value of self.timestamp if still not set.
Return type : datetime.datetime
"""
return _timestamp_manager(self.timestamp, self.JSON)
def save(self): def save(self):
"""Saves/Archive the URL. self.wayback_machine_save_api = WaybackMachineSaveAPI(
self.url, user_agent=self.user_agent
To save a webpage on WayBack machine we
need to send get request to https://web.archive.org/save/
And to get the archive URL we are required to read the
header of the API response.
_get_response() takes care of the get requests.
_archive_url_parser() parses the archive from the header.
return type : waybackpy.wrapper.Url
"""
request_url = "https://web.archive.org/save/" + _cleaned_url(self.url)
headers = {"User-Agent": self.user_agent}
response = _get_response(
request_url,
params=None,
headers=headers,
backoff_factor=2,
no_raise_on_redirects=True,
) )
self.archive_url = self.wayback_machine_save_api.archive_url
if not self.latest_version: self.timestamp = self.wayback_machine_save_api.timestamp()
self.latest_version = _latest_version("waybackpy", headers=headers) self.headers = self.wayback_machine_save_api.headers
if response:
res_headers = response.headers
else:
res_headers = "save redirected"
self._archive_url = "https://" + _archive_url_parser(
res_headers,
self.url,
latest_version=self.latest_version,
instance=self,
response=response,
)
if response.status_code == 509:
raise WaybackError(
"Can not save '{url}'. You have probably reached the limit of active "
"sessions. Try later.".format(
url=_cleaned_url(self.url), text=response.text
)
)
m = re.search(
r"https?://web.archive.org/web/([0-9]{14})/http", self._archive_url
)
str_ts = m.group(1)
ts = datetime.strptime(str_ts, "%Y%m%d%H%M%S")
now = datetime.utcnow()
total_seconds = int((now - ts).total_seconds())
if total_seconds > 60 * 3:
self.cached_save = True
self.timestamp = ts
return self return self
def get(self, url="", user_agent="", encoding=""):
"""GET the source of archive or any other URL.
url : str, waybackpy.wrapper.Url
The method will return the source code of
this URL instead of last fetched archive.
user_agent : str
The user_agent for GET request to API
encoding : str
If user is using any other encoding that
can't be detected by response.encoding
Return the source code of the last fetched
archive URL if no URL is passed to this method
else it returns the source code of url passed.
If encoding is not supplied, it is auto-detected
from the response itself by requests package.
"""
if not url and self._archive_url:
url = self._archive_url
elif not url and not self._archive_url:
url = _cleaned_url(self.url)
if not user_agent:
user_agent = self.user_agent
headers = {"User-Agent": str(user_agent)}
response = _get_response(str(url), params=None, headers=headers)
if not encoding:
try:
encoding = response.encoding
except AttributeError:
encoding = "UTF-8"
return response.content.decode(encoding.replace("text/html", "UTF-8", 1))
def near( def near(
self, self,
year=None, year=None,
@ -302,153 +31,45 @@ class Url:
minute=None, minute=None,
unix_timestamp=None, unix_timestamp=None,
): ):
"""
Parameters
----------
year : int self.wayback_machine_availability_api.near(
Archive close to year year=year,
month=month,
month : int day=day,
Archive close to month hour=hour,
minute=minute,
day : int unix_timestamp=unix_timestamp,
Archive close to day
hour : int
Archive close to hour
minute : int
Archive close to minute
unix_timestamp : str, float or int
Archive close to this unix_timestamp
Wayback Machine can have many archives of a webpage,
sometimes we want archive close to a specific time.
This method takes year, month, day, hour and minute as input.
The input type must be integer. Any non-supplied parameters
default to the current time.
We convert the input to a wayback machine timestamp using
_wayback_timestamp(), it returns a string.
We use the wayback machine's availability API
(https://archive.org/wayback/available)
to get the closest archive from the timestamp.
We set self._archive_url to the archive found, if any.
If archive found, we set self.timestamp to its timestamp.
We self._JSON to the response of the availability API.
And finally return self.
"""
if unix_timestamp:
timestamp = _unix_timestamp_to_wayback_timestamp(unix_timestamp)
else:
now = datetime.utcnow().timetuple()
timestamp = _wayback_timestamp(
year=year if year else now.tm_year,
month=month if month else now.tm_mon,
day=day if day else now.tm_mday,
hour=hour if hour else now.tm_hour,
minute=minute if minute else now.tm_min,
) )
self.set_availability_api_attrs()
endpoint = "https://archive.org/wayback/available"
headers = {"User-Agent": self.user_agent}
payload = {
"url": "{url}".format(url=_cleaned_url(self.url)),
"timestamp": timestamp,
}
response = _get_response(endpoint, params=payload, headers=headers)
data = response.json()
if not data["archived_snapshots"]:
raise WaybackError(
"Can not find archive for '{url}' try later or use wayback.Url(url, user_agent).save() "
"to create a new archive.\nAPI response:\n{text}".format(
url=_cleaned_url(self.url), text=response.text
)
)
archive_url = data["archived_snapshots"]["closest"]["url"]
archive_url = archive_url.replace(
"http://web.archive.org/web/", "https://web.archive.org/web/", 1
)
self._archive_url = archive_url
self.timestamp = datetime.strptime(
data["archived_snapshots"]["closest"]["timestamp"], "%Y%m%d%H%M%S"
)
self._JSON = data
return self return self
def oldest(self, year=1994): def oldest(self):
""" self.wayback_machine_availability_api.oldest()
Returns the earliest/oldest Wayback Machine archive for the webpage. self.set_availability_api_attrs()
return self
Wayback machine has started archiving the internet around 1997 and
therefore we can't have any archive older than 1997, we use 1994 as the
deafult year to look for the oldest archive.
We simply pass the year in near() and return it.
"""
return self.near(year=year)
def newest(self): def newest(self):
"""Return the newest Wayback Machine archive available. self.wayback_machine_availability_api.newest()
self.set_availability_api_attrs()
return self
We return the return value of self.near() as it deafults to current UTC time. def set_availability_api_attrs(self):
self.archive_url = self.wayback_machine_availability_api.archive_url
Due to Wayback Machine database lag, this may not always be the self.JSON = self.wayback_machine_availability_api.JSON
most recent archive. self.timestamp = self.wayback_machine_availability_api.timestamp()
return type : waybackpy.wrapper.Url
"""
return self.near()
def total_archives(self, start_timestamp=None, end_timestamp=None): def total_archives(self, start_timestamp=None, end_timestamp=None):
"""Returns the total number of archives for an URL cdx = WaybackMachineCDXServerAPI(
self.url,
Parameters
----------
start_timestamp : str
1 to 14 digit string of numbers, you are not required to
pass a full 14 digit timestamp.
end_timestamp : str
1 to 14 digit string of numbers, you are not required to
pass a full 14 digit timestamp.
return type : int
A webpage can have multiple archives on the wayback machine
If someone wants to count the total number of archives of a
webpage on wayback machine they can use this method.
Returns the total number of Wayback Machine archives for the URL.
"""
cdx = Cdx(
_cleaned_url(self.url),
user_agent=self.user_agent, user_agent=self.user_agent,
start_timestamp=start_timestamp, start_timestamp=start_timestamp,
end_timestamp=end_timestamp, end_timestamp=end_timestamp,
) )
# cdx.snapshots() is generator not list. count = 0
i = 0
for _ in cdx.snapshots(): for _ in cdx.snapshots():
i = i + 1 count = count + 1
return i return count
def known_urls( def known_urls(
self, self,
@ -458,45 +79,13 @@ class Url:
end_timestamp=None, end_timestamp=None,
match_type="prefix", match_type="prefix",
): ):
"""Yields known_urls URLs from the CDX API.
Parameters
----------
subdomain : bool
If True fetch subdomain URLs along with the host URLs.
host : bool
Only fetch host URLs.
start_timestamp : str
1 to 14 digit string of numbers, you are not required to
pass a full 14 digit timestamp.
end_timestamp : str
1 to 14 digit string of numbers, you are not required to
pass a full 14 digit timestamp.
match_type : str
One of (exact, prefix, host and domain)
return type : waybackpy.snapshot.CdxSnapshot
Yields list of URLs known to exist for given input.
Defaults to input URL as prefix.
Based on:
https://gist.github.com/mhmdiaa/adf6bff70142e5091792841d4b372050
By Mohammed Diaa (https://github.com/mhmdiaa)
"""
if subdomain: if subdomain:
match_type = "domain" match_type = "domain"
if host: if host:
match_type = "host" match_type = "host"
cdx = Cdx( cdx = WaybackMachineCDXServerAPI(
_cleaned_url(self.url), self.url,
user_agent=self.user_agent, user_agent=self.user_agent,
start_timestamp=start_timestamp, start_timestamp=start_timestamp,
end_timestamp=end_timestamp, end_timestamp=end_timestamp,