Compare commits

..

No commits in common. "0.0.1" and "master" have entirely different histories.

10 changed files with 84 additions and 24 deletions

View File

@ -1,3 +1,5 @@
A video introducing this project is available [here](https://crawler.yt.lemnoslife.com/presentation).
# The algorithm: # The algorithm:
To retrieve the most YouTube video ids in order to retrieve the most video captions, we need to retrieve the most YouTube channels. To retrieve the most YouTube video ids in order to retrieve the most video captions, we need to retrieve the most YouTube channels.
@ -9,6 +11,24 @@ A ready to be used by the end-user website instance of this project is hosted at
See more details on [the Wiki](https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine/wiki). See more details on [the Wiki](https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine/wiki).
# The project structure:
- `main.cpp` contains the C++ multi-threaded algorithm proceeding to the YouTube channels discovery. It is notably made of the following functions:
- `main` which takes into account the command line arguments, load variables from files (`channels.txt`, `keys.txt`, `channels/` content) and start the threads as executing `treatChannels` function
- `treatChannels` gets a YouTube channel to treat, treat it in `treatChannelOrVideo` function and compress the retrieved data
- `treatChannelOrVideo` which provided a YouTube channel id or a video id, treats this resource. In both cases it treats comments left on this resource. In the case of a channel it also treats its `CHANNELS`, `COMMUNITY`, `PLAYLISTS` and `LIVE` tabs and downloads the captions of the channel videos.
- `markChannelAsRequiringTreatmentIfNeeded` which provided a YouTube channel id marks it as requiring treatment if it wasn't already treated
- `execute` which provided an `yt-dlp` command executes it in a shell
- `getJson` which provided an API request returns a JSON structure with its result. In the case that the API requested is YouTube Data API v3 and a set of keys is provided (see below `keys.txt`), it rotates the keys as required
- `channels.txt` contains a starting set of channels which contains mostly the 100 most subscribed French channels
- `keys.txt` contains a set of YouTube Data API v3 keys (not provided) to have the ability to request this API (see an alternative to filling it in the section below with `--no-keys` command line argument)
- `scripts/` contains Python scripts to:
- generate the `channels.txt` as described above (`retrieveTop100SubscribersFrance.py`)
- remove channels being treated before a restart of the algorithm as described in [the `main` function documentation](https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine/src/commit/8dd89e6e881da0a905b6fa4b23775c4344dd0d9d/main.cpp#L126-L128) (`removeChannelsBeingTreated.py`)
- `website/` is a PHP website using WebSocket to allow the end-user to proceed to requests on the retrieved dataset. When fetching the website, the end-user receives the interpreted `index.php` which upon making a request interacts with `websocket.php` which in the back-end dispatches the requests from various end-users to `search.py` (which treats the actual end-user request on the compressed dataset) by using `users/` to make the inter-process communication.
Note that this project heavily relies on [YouTube operational API](https://github.com/Benjamin-Loison/YouTube-operational-API) [which was modified for this project](https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine/wiki/YouTube-operational-API-commits).
# Running the YouTube graph discovery algorithm: # Running the YouTube graph discovery algorithm:
Because of [the current compression mechanism](https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine/issues/30), Linux is the only known OS able to run this algorithm. Because of [the current compression mechanism](https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine/issues/30), Linux is the only known OS able to run this algorithm.

View File

@ -27,7 +27,9 @@ void createDirectory(string path),
markChannelAsRequiringTreatmentIfNeeded(unsigned short threadId, string channelId), markChannelAsRequiringTreatmentIfNeeded(unsigned short threadId, string channelId),
execute(unsigned short threadId, string command, bool debug = true); execute(unsigned short threadId, string command, bool debug = true);
string getHttps(string url), string getHttps(string url),
join(vector<string> parts, string delimiter); join(vector<string> parts, string delimiter),
escapeShellArgument(string shellArgument),
replaceAll(string str, const string& from, const string& to);
size_t writeCallback(void* contents, size_t size, size_t nmemb, void* userp); size_t writeCallback(void* contents, size_t size, size_t nmemb, void* userp);
bool doesFileExist(string filePath), bool doesFileExist(string filePath),
writeFile(unsigned short threadId, string filePath, string option, string toWrite); writeFile(unsigned short threadId, string filePath, string option, string toWrite);
@ -242,7 +244,7 @@ void treatChannels(unsigned short threadId)
// As I haven't found any well-known library that compress easily a directory, I have chosen to rely on `zip` cli. // As I haven't found any well-known library that compress easily a directory, I have chosen to rely on `zip` cli.
// We precise no `debug`ging, as otherwise the zipping operation doesn't work as expected. // We precise no `debug`ging, as otherwise the zipping operation doesn't work as expected.
// As the zipping process isn't recursive, we can't just rely on `ls`, but we are obliged to use `find`. // As the zipping process isn't recursive, we can't just rely on `ls`, but we are obliged to use `find`.
execute(threadId, "cd " + channelToTreatDirectory + " && find | zip ../" + channelToTreat + ".zip -@"); execute(threadId, "cd " + escapeShellArgument(channelToTreatDirectory) + " && find | zip " + escapeShellArgument("../" + channelToTreat + ".zip") + " -@");
PRINT("Compression finished, started deleting initial directory...") PRINT("Compression finished, started deleting initial directory...")
// Get rid of the uncompressed data. // Get rid of the uncompressed data.
@ -681,7 +683,7 @@ void treatChannelOrVideo(unsigned short threadId, bool isIdAChannelId, string id
// The underscore in `-o` argument is used to not end up with hidden files. // The underscore in `-o` argument is used to not end up with hidden files.
// We are obliged to precise the video id after `--`, otherwise if the video id starts with `-` it's considered as an argument. // We are obliged to precise the video id after `--`, otherwise if the video id starts with `-` it's considered as an argument.
string commandCommonPrefix = "yt-dlp --skip-download ", string commandCommonPrefix = "yt-dlp --skip-download ",
commandCommonPostfix = " -o '" + channelCaptionsToTreatDirectory + "_' -- " + videoId; commandCommonPostfix = " -o " + escapeShellArgument(channelCaptionsToTreatDirectory + "_") + " -- " + escapeShellArgument(videoId);
string command = commandCommonPrefix + "--write-sub --sub-lang all,-live_chat" + commandCommonPostfix; string command = commandCommonPrefix + "--write-sub --sub-lang all,-live_chat" + commandCommonPostfix;
execute(threadId, command); execute(threadId, command);
@ -929,3 +931,20 @@ size_t writeCallback(void* contents, size_t size, size_t nmemb, void* userp)
((string*)userp)->append((char*)contents, size * nmemb); ((string*)userp)->append((char*)contents, size * nmemb);
return size * nmemb; return size * nmemb;
} }
// Source: https://stackoverflow.com/a/3669819
string escapeShellArgument(string shellArgument)
{
return "'" + replaceAll(shellArgument, "'", "'\\''") + "'";
}
string replaceAll(string str, const string& from, const string& to)
{
size_t start_pos = 0;
while((start_pos = str.find(from, start_pos)) != string::npos)
{
str.replace(start_pos, from.length(), to);
start_pos += to.length(); // Handles case where 'to' is a substring of 'from'
}
return str;
}

View File

@ -1,6 +1,6 @@
#!/usr/bin/python3 #!/usr/bin/python3
PREFIX = 'Comments per second: ' PREFIX = 'Channels per second: '
alreadyTreatedCommentsCount = 0 alreadyTreatedCommentsCount = 0
with open('nohup.out') as f: with open('nohup.out') as f:
@ -8,5 +8,7 @@ with open('nohup.out') as f:
for line in lines: for line in lines:
if PREFIX in line: if PREFIX in line:
alreadyTreatedCommentsCount += int(line.split(PREFIX)[-1]) alreadyTreatedCommentsCount += int(line.split(PREFIX)[-1])
#if 'UCsT0YIqwnpJCM-mx7-gSA4Q' in line:
# break
print(alreadyTreatedCommentsCount) print(alreadyTreatedCommentsCount)

View File

@ -1,5 +1,7 @@
#!/usr/bin/python3 #!/usr/bin/python3
# This algorithm should also take in account other features that we use to retrieve channels.
import os, requests, json, time, datetime import os, requests, json, time, datetime
path = 'channels/' path = 'channels/'
@ -7,23 +9,24 @@ path = 'channels/'
os.chdir(path) os.chdir(path)
def getTimestampFromDateString(dateString): def getTimestampFromDateString(dateString):
return int(time.mktime(datetime.datetime.strptime(dateString, "%Y-%m-%dT%H:%M:%SZ").timetuple())) return int(time.mktime(datetime.datetime.strptime(dateString, '%Y-%m-%dT%H:%M:%SZ').timetuple()))
for channelId in list(os.walk('.'))[1]: for channelId in list(os.walk('.'))[1]:
channelId = channelId[2:] channelId = channelId[2:]
#print(channelId) #print(channelId)
numberOfRequests = len(list(os.walk(channelId))[0][2]) numberOfRequests = len(list(os.walk(f'{channelId}/requests'))[0][2]) - 1
# Assume that the folder isn't empty (may not be the case, but it is most of the time). # Assume that the folder isn't empty (may not be the case, but it is most of the time).
with open(f'{channelId}/{str(numberOfRequests - 1)}.json') as f: filePath = f'{channelId}/requests/{str(numberOfRequests - 1)}.json'
content = "\n".join(f.read().splitlines()[1:]) with open(filePath) as f:
data = json.loads(content) print(filePath)
#content = '\n'.join(f.read().splitlines()[1:])
data = json.load(f)#json.loads(content)
snippet = data['items'][-1]['snippet'] snippet = data['items'][-1]['snippet']
if 'topLevelComment' in snippet: if 'topLevelComment' in snippet:
snippet = snippet['topLevelComment']['snippet'] snippet = snippet['topLevelComment']['snippet']
latestTreatedCommentDate = snippet['publishedAt'] latestTreatedCommentDate = snippet['publishedAt']
url = f'https://yt.lemnoslife.com/noKey/channels?part=snippet&id={channelId}' url = f'https://yt.lemnoslife.com/noKey/channels?part=snippet&id={channelId}'
content = requests.get(url).text data = requests.get(url).json()
data = json.loads(content)
channelCreationDate = data['items'][0]['snippet']['publishedAt'] channelCreationDate = data['items'][0]['snippet']['publishedAt']
#print(channelCreationDate) #print(channelCreationDate)
# Timing percentage not taking into account the not uniform in time distribution of comments. Note that in the case of the last request is to list replies to a comment, the percentage might goes a bit backward, as replies are posted after the initial comment. # Timing percentage not taking into account the not uniform in time distribution of comments. Note that in the case of the last request is to list replies to a comment, the percentage might goes a bit backward, as replies are posted after the initial comment.

View File

@ -1,8 +1,8 @@
#!/usr/bin/python3 #!/usr/bin/python3
import os, requests, json import os, requests
channelIds = next(os.walk('channels/'))[1] channelIds = [channelId.replace('.zip', '') for channelId in next(os.walk('channels/'))[2]]
maxResults = 50 maxResults = 50
channelIdsChunks = [channelIds[i : i + maxResults] for i in range(0, len(channelIds), maxResults)] channelIdsChunks = [channelIds[i : i + maxResults] for i in range(0, len(channelIds), maxResults)]
@ -11,8 +11,7 @@ mostSubscriberChannel = None
for channelIds in channelIdsChunks: for channelIds in channelIdsChunks:
url = 'https://yt.lemnoslife.com/noKey/channels?part=statistics&id=' + ','.join(channelIds) url = 'https://yt.lemnoslife.com/noKey/channels?part=statistics&id=' + ','.join(channelIds)
content = requests.get(url).text data = requests.get(url).json()
data = json.loads(content)
items = data['items'] items = data['items']
for item in items: for item in items:
subscriberCount = int(item['statistics']['subscriberCount']) subscriberCount = int(item['statistics']['subscriberCount'])

View File

@ -1,4 +1,7 @@
#!/usr/bin/python3
# We can't proceed automatically by using `requests` Python module because https://socialblade.com/youtube/top/country/fr/mostsubscribed is protected by CloudFlare. # We can't proceed automatically by using `requests` Python module because https://socialblade.com/youtube/top/country/fr/mostsubscribed is protected by CloudFlare.
# Note that `undetected-chromedriver` might be a workaround this limitation.
with open('mostsubscribed.html') as f: with open('mostsubscribed.html') as f:
lines = f.read().splitlines() lines = f.read().splitlines()

View File

@ -10,6 +10,11 @@ pathSearchMessageParts = sys.argv[2].split(' ')
pathSearch = pathSearchMessageParts[1] pathSearch = pathSearchMessageParts[1]
message = ' '.join(pathSearchMessageParts[2:]) message = ' '.join(pathSearchMessageParts[2:])
pathSearchRegex = re.compile(pathSearch)
messageRegex = re.compile(message)
isPathSearchAChannelId = re.fullmatch(r'[a-zA-Z0-9-_]{24}', pathSearch)
searchOnlyCaptions = pathSearchMessageParts[0] == 'search-only-captions' searchOnlyCaptions = pathSearchMessageParts[0] == 'search-only-captions'
clientFilePath = f'users/{clientId}.txt' clientFilePath = f'users/{clientId}.txt'
@ -22,7 +27,7 @@ def write(s):
read = f.read() read = f.read()
# We are appening content, as we moved in-file cursor. # We are appening content, as we moved in-file cursor.
if read != '': if read != '':
f.write("\n") f.write('\n')
f.write(s) f.write(s)
f.flush() f.flush()
fcntl.flock(f, fcntl.LOCK_UN) fcntl.flock(f, fcntl.LOCK_UN)
@ -33,23 +38,31 @@ def cleanCaption(caption):
return caption.replace('\n', ' ') return caption.replace('\n', ' ')
# As `zipgrep` doesn't support arguments to stop on first match for each file, we proceed manually to keep a good theoretical complexity. # As `zipgrep` doesn't support arguments to stop on first match for each file, we proceed manually to keep a good theoretical complexity.
if isPathSearchAChannelId:
file = pathSearch + '.zip'
if os.path.isfile(path + file):
files = [file]
else:
write(f'progress:0 / 0')
else:
files = [file for file in os.listdir(path) if file.endswith('.zip')] files = [file for file in os.listdir(path) if file.endswith('.zip')]
for fileIndex, file in enumerate(files): for fileIndex, file in enumerate(files):
write(f'progress:{fileIndex + 1} / {len(files)}') write(f'progress:{fileIndex} / {len(files)}')
zip = zipfile.ZipFile(path + file) zip = zipfile.ZipFile(path + file)
for fileInZip in zip.namelist(): for fileInZip in zip.namelist():
endsWithVtt = fileInZip.endswith('.vtt') endsWithVtt = fileInZip.endswith('.vtt')
if searchOnlyCaptions and not endsWithVtt: if searchOnlyCaptions and not endsWithVtt:
continue continue
toWrite = f'{file}/{fileInZip}' toWrite = f'{file}/{fileInZip}'
if not bool(re.search(pathSearch, toWrite)): if not bool(pathSearchRegex.search(toWrite)):
continue continue
with zip.open(fileInZip) as f: with zip.open(fileInZip) as f:
if endsWithVtt: if endsWithVtt:
content = f.read().decode('utf-8') content = f.read().decode('utf-8')
stringIOf = StringIO(content) stringIOf = StringIO(content)
wholeCaption = ' '.join([cleanCaption(caption.text) for caption in webvtt.read_buffer(stringIOf)]) wholeCaption = ' '.join([cleanCaption(caption.text) for caption in webvtt.read_buffer(stringIOf)])
messagePositions = [m.start() for m in re.finditer(message, wholeCaption)] messagePositions = [m.start() for m in messageRegex.finditer(wholeCaption)]
if messagePositions != []: if messagePositions != []:
timestamps = [] timestamps = []
for messagePosition in messagePositions: for messagePosition in messagePositions:
@ -67,6 +80,7 @@ for fileIndex, file in enumerate(files):
if message in str(line): if message in str(line):
write(toWrite) write(toWrite)
break break
write(f'progress:{fileIndex + 1} / {len(files)}')
with open(clientFilePath) as f: with open(clientFilePath) as f:
while True: while True:

View File

@ -27,7 +27,7 @@ class Client
posix_kill($this->pid, SIGTERM); posix_kill($this->pid, SIGTERM);
$clientFilePath = getClientFilePath($this->id); $clientFilePath = getClientFilePath($this->id);
if (file_exists($clientFilePath)) { if (file_exists($clientFilePath)) {
$fp = fopen($clientFilePath, "r+"); $fp = fopen($clientFilePath, 'r+');
if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock
unlink($clientFilePath); // delete file unlink($clientFilePath); // delete file
flock($fp, LOCK_UN); // release the lock flock($fp, LOCK_UN); // release the lock
@ -92,8 +92,6 @@ class MyProcess implements MessageComponentInterface
public function onMessage(ConnectionInterface $from, $msg) public function onMessage(ConnectionInterface $from, $msg)
{ {
// As we are going to use this argument in a shell command, we escape it.
$msg = escapeshellarg($msg);
$client = $this->clients->offsetGet($from); $client = $this->clients->offsetGet($from);
// If a previous request was received, we execute the new one with another client for simplicity otherwise with current file deletion approach, we can't tell the worker `search.py` that we don't care about its execution anymore. // If a previous request was received, we execute the new one with another client for simplicity otherwise with current file deletion approach, we can't tell the worker `search.py` that we don't care about its execution anymore.
if ($client->pid !== null) { if ($client->pid !== null) {
@ -105,6 +103,8 @@ class MyProcess implements MessageComponentInterface
$clientFilePath = getClientFilePath($clientId); $clientFilePath = getClientFilePath($clientId);
// Create the worker output file otherwise it would believe that we don't need this worker anymore. // Create the worker output file otherwise it would believe that we don't need this worker anymore.
file_put_contents($clientFilePath, ''); file_put_contents($clientFilePath, '');
// As we are going to use this argument in a shell command, we escape it.
$msg = escapeshellarg($msg);
// Start the independent worker. // Start the independent worker.
// Redirecting `stdout` is mandatory otherwise `exec` is blocking. // Redirecting `stdout` is mandatory otherwise `exec` is blocking.
$client->pid = exec("./search.py $clientId $msg > /dev/null & echo $!"); $client->pid = exec("./search.py $clientId $msg > /dev/null & echo $!");
@ -114,7 +114,7 @@ class MyProcess implements MessageComponentInterface
// If the worker output file doesn't exist anymore, then it means that the worker have finished its work and acknowledged that `websocket.php` completely read its output. // If the worker output file doesn't exist anymore, then it means that the worker have finished its work and acknowledged that `websocket.php` completely read its output.
if (file_exists($clientFilePath)) { if (file_exists($clientFilePath)) {
// `flock` requires `r`eading permission and we need `w`riting one due to `ftruncate` usage. // `flock` requires `r`eading permission and we need `w`riting one due to `ftruncate` usage.
$fp = fopen($clientFilePath, "r+"); $fp = fopen($clientFilePath, 'r+');
$read = null; $read = null;
if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock
// We assume that the temporary output is less than 1 MB long. // We assume that the temporary output is less than 1 MB long.