Compare commits

..

No commits in common. "master" and "0.0.0" have entirely different histories.

18 changed files with 131 additions and 2595 deletions

2
.gitignore vendored
View File

@ -1,2 +0,0 @@
keys.txt
channels.txt

View File

@ -1,4 +1,4 @@
.PHONY: main
main:
g++ main.cpp -g -std=c++17 -lcurl -lpthread -o youtubeCaptionsSearchEngine
g++ main.cpp -g -std=c++17 -lcurl -lpthread -o main

142
README.md
View File

@ -1,139 +1,19 @@
A video introducing this project is available [here](https://crawler.yt.lemnoslife.com/presentation).
As explained in the project proposal, the idea to retrieve all video ids is to start from a starting set of channels, then list their videos using YouTube Data API v3 PlaylistItems: list, then list the comments on their videos and then restart the process as we potentially retrieved new channels thanks to comment authors on videos from already known channels.
# The algorithm:
For a given channel, there are two ways to list comments users published on it:
1. As explained, YouTube Data API v3 PlaylistItems: list endpoint enables us to list the channel videos up to 20,000 videos (so we will not treat and write down channels in this case) and CommentThreads: list and Comments: list endpoints enable us to retrieve their comments
2. A simpler approach consists in using YouTube Data API v3 CommentThreads: list endpoint with `allThreadsRelatedToChannelId`. The main upside of this method, in addition to be simpler, is that for channels with many videos we spare much time by working 100 comments at a time instead of a video at a time with possibly not a single comment. Note that this approach doesn't list all videos etc so we don't retrieve some information. Note that this approach doesn't work for some channels that have comments enabled on some videos but not the whole channels.
So when possible we will proceed with 2. and use 1. as a fallback approach.
To retrieve the most YouTube video ids in order to retrieve the most video captions, we need to retrieve the most YouTube channels.
So to discover the YouTube channels graph with a breadth-first search, we proceed as follows:
1. Provide a starting set of channels.
2. Given a channel, retrieve other channels thanks to its content by using [YouTube Data API v3](https://developers.google.com/youtube/v3) and [YouTube operational API](https://github.com/Benjamin-Loison/YouTube-operational-API) and then repeat 1. for each retrieved channel.
We can multi-thread this process by channel or we can multi-thread per videos of a given channel (loosing optimization of CommentThreads: list with `allThreadsRelatedToChannelId`). In any case we shouldn't do something hybrid in terms of multi-threading, as it would be too complex.
As would like to proceed channel per channel, the question is **how much time does it take to retrieve all comments from the biggest YouTube channel? If the answer is a long period of time, then multi-threading per videos of a given channel may make sense.** There are two possibilities following our methods:
1. Here the complexity is linear in the number of channel's comments, more precisely this number divided by 100 - we could guess that the channel with the most subscribers ([T-Series](https://www.youtube.com/@tseries)) has the most comments
2. Here the complexity is linear in the number of videos - as far as I know [RoelVandePaar](https://www.youtube.com/@RoelVandePaar) has the most videos, [2,026,566 according to SocialBlade](https://socialblade.com/youtube/c/roelvandepaar). However due to the 20,000 limit of YouTube Data API v3 PlaylistItems: list the actual limit is 20,000 [as far as I know](https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine/wiki#user-content-concerning-20-000-videos-limit-for-youtube-data-api-v3-playlistitems-list-endpoint).
A ready to be used by the end-user website instance of this project is hosted at: https://crawler.yt.lemnoslife.com
See more details on [the Wiki](https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine/wiki).
# The project structure:
- `main.cpp` contains the C++ multi-threaded algorithm proceeding to the YouTube channels discovery. It is notably made of the following functions:
- `main` which takes into account the command line arguments, load variables from files (`channels.txt`, `keys.txt`, `channels/` content) and start the threads as executing `treatChannels` function
- `treatChannels` gets a YouTube channel to treat, treat it in `treatChannelOrVideo` function and compress the retrieved data
- `treatChannelOrVideo` which provided a YouTube channel id or a video id, treats this resource. In both cases it treats comments left on this resource. In the case of a channel it also treats its `CHANNELS`, `COMMUNITY`, `PLAYLISTS` and `LIVE` tabs and downloads the captions of the channel videos.
- `markChannelAsRequiringTreatmentIfNeeded` which provided a YouTube channel id marks it as requiring treatment if it wasn't already treated
- `execute` which provided an `yt-dlp` command executes it in a shell
- `getJson` which provided an API request returns a JSON structure with its result. In the case that the API requested is YouTube Data API v3 and a set of keys is provided (see below `keys.txt`), it rotates the keys as required
- `channels.txt` contains a starting set of channels which contains mostly the 100 most subscribed French channels
- `keys.txt` contains a set of YouTube Data API v3 keys (not provided) to have the ability to request this API (see an alternative to filling it in the section below with `--no-keys` command line argument)
- `scripts/` contains Python scripts to:
- generate the `channels.txt` as described above (`retrieveTop100SubscribersFrance.py`)
- remove channels being treated before a restart of the algorithm as described in [the `main` function documentation](https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine/src/commit/8dd89e6e881da0a905b6fa4b23775c4344dd0d9d/main.cpp#L126-L128) (`removeChannelsBeingTreated.py`)
- `website/` is a PHP website using WebSocket to allow the end-user to proceed to requests on the retrieved dataset. When fetching the website, the end-user receives the interpreted `index.php` which upon making a request interacts with `websocket.php` which in the back-end dispatches the requests from various end-users to `search.py` (which treats the actual end-user request on the compressed dataset) by using `users/` to make the inter-process communication.
Note that this project heavily relies on [YouTube operational API](https://github.com/Benjamin-Loison/YouTube-operational-API) [which was modified for this project](https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine/wiki/YouTube-operational-API-commits).
# Running the YouTube graph discovery algorithm:
Because of [the current compression mechanism](https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine/issues/30), Linux is the only known OS able to run this algorithm.
To clone the repository, run:
```sh
git clone https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine
```
Move to the cloned repository by running:
```sh
cd YouTube_captions_search_engine/
```
To install the dependencies on an `apt` based Linux distribution of this project make sure to have [`pip`](https://pip.pypa.io/en/stable/installation/) and run:
Have to proceed with a breadth-first search approach as treating all *child* channels might take a time equivalent to treating the whole original tree.
```sh
sudo apt install nlohmann-json3-dev
pip install yt-dlp
```
To compile the YouTube discovery graph algorithm, run:
```sh
make
```
To see the command line arguments of the algorithm, run:
```sh
./youtubeCaptionsSearchEngine -h
```
To run the YouTube discovery graph algorithm, run:
```sh
./youtubeCaptionsSearchEngine
```
Except if you provide the argument `--youtube-operational-api-instance-url https://yt.lemnoslife.com`, you have [to host your own instance of the YouTube operational API](https://github.com/Benjamin-Loison/YouTube-operational-API/#install-your-own-instance-of-the-api).
Except if you provide the argument `--no-keys`, you have to provide at least one [YouTube Data API v3 key](https://developers.google.com/youtube/v3/getting-started) in `keys.txt`.
# Hosting the website enabling users to make requests:
Move to the `website/` folder by running:
```sh
cd website/
```
To install its dependencies make sure to have [`composer`](https://getcomposer.org/doc/00-intro.md) installed and run:
```sh
sudo apt install nginx
composer install
pip install webvtt-py
```
Add the following configuration to your Nginx website one:
```nginx
# Make the default webpage of your website to be `index.php`.
index index.php;
# Allow end-users to retrieve the content of a file within a channel zip.
location /channels {
rewrite ^(.*).zip$ /channels.php;
rewrite ^(.*).zip/(.*).json$ /channels.php;
rewrite ^(.*).zip/(.*).txt$ /channels.php;
rewrite ^(.*).zip/(.*).vtt$ /channels.php;
# Allow end-users to list `channels/` content.
autoindex on;
}
# Disable end-users to access to other end-users requests.
location /users {
deny all;
}
# Configure the websocket endpoint.
location /websocket {
# switch off logging
access_log off;
# redirect all HTTP traffic to localhost
proxy_pass http://localhost:4430;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# WebSocket support (nginx 1.4)
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
# timeout extension, possibly keep this short if using a ping strategy
proxy_read_timeout 99999s;
}
```
Start the websocket worker by running:
```sh
php websockets.php
./main
```

View File

@ -98,13 +98,3 @@ UCfih6kPJCpzWmtCFtlpYK6A
UCdTyuXgmJkG_O8_75eqej-w
UCxXFx2jz8N02sNqv1VeDEGA
UCj8BKFCTH-mqRlYwcmX2xwg
UCsT0YIqwnpJCM-mx7-gSA4Q
UCAuUUnT6oDeKwE6v1NGQxug
UCy0uwqmXSHVOgqo3nrN4RCQ
UCawLcDd9clh27b1z55Gcawg
UC6bfT6U4WED5EyzymREvKlQ
UCINdSH_R15xft_ctNm50eGQ
UCVx2ZvskbDkHpLlYEQ9FULw
UCBcmi8nLrqfFluiexxjl7bg
UCBnZ16ahKA2DZ_T5W0FPUXg
UCf8w5m0YsRa8MHQ5bwSGmbw

View File

@ -1,6 +1,6 @@
#!/usr/bin/python3
PREFIX = 'Channels per second: '
PREFIX = 'Comments per second: '
alreadyTreatedCommentsCount = 0
with open('nohup.out') as f:
@ -8,7 +8,5 @@ with open('nohup.out') as f:
for line in lines:
if PREFIX in line:
alreadyTreatedCommentsCount += int(line.split(PREFIX)[-1])
#if 'UCsT0YIqwnpJCM-mx7-gSA4Q' in line:
# break
print(alreadyTreatedCommentsCount)

View File

@ -1,7 +1,5 @@
#!/usr/bin/python3
# This algorithm should also take in account other features that we use to retrieve channels.
import os, requests, json, time, datetime
path = 'channels/'
@ -9,24 +7,23 @@ path = 'channels/'
os.chdir(path)
def getTimestampFromDateString(dateString):
return int(time.mktime(datetime.datetime.strptime(dateString, '%Y-%m-%dT%H:%M:%SZ').timetuple()))
return int(time.mktime(datetime.datetime.strptime(dateString, "%Y-%m-%dT%H:%M:%SZ").timetuple()))
for channelId in list(os.walk('.'))[1]:
channelId = channelId[2:]
#print(channelId)
numberOfRequests = len(list(os.walk(f'{channelId}/requests'))[0][2]) - 1
numberOfRequests = len(list(os.walk(channelId))[0][2])
# Assume that the folder isn't empty (may not be the case, but it is most of the time).
filePath = f'{channelId}/requests/{str(numberOfRequests - 1)}.json'
with open(filePath) as f:
print(filePath)
#content = '\n'.join(f.read().splitlines()[1:])
data = json.load(f)#json.loads(content)
with open(f'{channelId}/{str(numberOfRequests - 1)}.json') as f:
content = "\n".join(f.read().splitlines()[1:])
data = json.loads(content)
snippet = data['items'][-1]['snippet']
if 'topLevelComment' in snippet:
snippet = snippet['topLevelComment']['snippet']
latestTreatedCommentDate = snippet['publishedAt']
url = f'https://yt.lemnoslife.com/noKey/channels?part=snippet&id={channelId}'
data = requests.get(url).json()
content = requests.get(url).text
data = json.loads(content)
channelCreationDate = data['items'][0]['snippet']['publishedAt']
#print(channelCreationDate)
# Timing percentage not taking into account the not uniform in time distribution of comments. Note that in the case of the last request is to list replies to a comment, the percentage might goes a bit backward, as replies are posted after the initial comment.

View File

@ -1,8 +1,8 @@
#!/usr/bin/python3
import os, requests
import os, requests, json
channelIds = [channelId.replace('.zip', '') for channelId in next(os.walk('channels/'))[2]]
channelIds = next(os.walk('channels/'))[1]
maxResults = 50
channelIdsChunks = [channelIds[i : i + maxResults] for i in range(0, len(channelIds), maxResults)]
@ -11,7 +11,8 @@ mostSubscriberChannel = None
for channelIds in channelIdsChunks:
url = 'https://yt.lemnoslife.com/noKey/channels?part=statistics&id=' + ','.join(channelIds)
data = requests.get(url).json()
content = requests.get(url).text
data = json.loads(content)
items = data['items']
for item in items:
subscriberCount = int(item['statistics']['subscriberCount'])

704
main.cpp
View File

@ -12,84 +12,45 @@ using namespace std;
using namespace chrono;
using json = nlohmann::json;
// Concerning `retryOnCommentsDisabled`, `commentThreads` can return for some channels that they have disabled their comments while we can find comments on some videos, so we enumerate the channel videos and request the comments on each video.
// Concerning `returnErrorIfPlaylistNotFound`, it is used when not trying to retrieve a channel `uploads` playlist content as it seems to always work.
enum getJsonBehavior { normal, retryOnCommentsDisabled, returnErrorIfPlaylistNotFound };
set<string> setFromVector(vector<string> vec);
vector<string> getFileContent(string filePath);
json getJson(unsigned short threadId, string url, bool usingYouTubeDataApiV3, string channelId, getJsonBehavior behavior = normal);
json getJson(unsigned short threadId, string url, string directoryPath, getJsonBehavior behavior = normal);
void createDirectory(string path),
print(ostringstream* toPrint),
treatComment(unsigned short threadId, json comment, string channelId),
treatChannelOrVideo(unsigned short threadId, bool isIdAChannelId, string id, string channelToTreat),
treatChannelOrVideo(unsigned short threadId, bool isChannel, string id, string channelToTreat),
treatChannels(unsigned short threadId),
deleteDirectory(string path),
markChannelAsRequiringTreatmentIfNeeded(unsigned short threadId, string channelId),
execute(unsigned short threadId, string command, bool debug = true);
deleteDirectory(string path);
string getHttps(string url),
join(vector<string> parts, string delimiter),
escapeShellArgument(string shellArgument),
replaceAll(string str, const string& from, const string& to);
exec(string cmd);
size_t writeCallback(void* contents, size_t size, size_t nmemb, void* userp);
bool doesFileExist(string filePath),
writeFile(unsigned short threadId, string filePath, string option, string toWrite);
// Use macros not to have to repeat `threadId` in each function calling `print` function.
#define THREAD_PRINT(threadId, x) { ostringstream toPrint; toPrint << threadId << ": " << x; print(&toPrint); }
#define PRINT(x) THREAD_PRINT(threadId, x)
#define PRINT(threadId, x) { ostringstream toPrint; toPrint << threadId << ": " << x; print(&toPrint); }
#define DEFAULT_THREAD_ID 0
#define MAIN_PRINT(x) THREAD_PRINT(DEFAULT_THREAD_ID, x)
#define MAIN_PRINT(x) PRINT(DEFAULT_THREAD_ID, x)
#define EXIT_WITH_ERROR(x) { PRINT(x); exit(EXIT_FAILURE); }
#define MAIN_EXIT_WITH_ERROR(x) { MAIN_PRINT(x); exit(EXIT_FAILURE); }
// Note that in the following a `channel` designates a `string` that is the channel id starting with `UC`.
// The only resources shared are:
// - standard streams
// - the ordered set of channels to treat and the unordered set of channels already treated
// - the ordered set of YouTube Data API v3 keys
mutex printMutex,
channelsAlreadyTreatedAndToTreatMutex,
quotaMutex;
// We use `set`s and `map`s for performance reasons.
set<string> channelsAlreadyTreated;
// Two `map`s to simulate a bidirectional map.
map<unsigned int, string> channelsToTreat;
map<string, unsigned int> channelsToTreatRev;
vector<string> youtubeDataApiV3keys;
// For statistics we count the number of:
// - channels found per second (`channelsFoundPerSecondCount`)
// - channels (`channelsTreatedCountThreads`) and requests (`requestsCountThreads`) done by each channel once they are treated
unsigned int channelsFoundPerSecondCount = 0;
map<unsigned short, unsigned int> channelsTreatedCountThreads,
requestsCountThreads;
// Variables that can be override by command line arguments.
set<string> channelsAlreadyTreated,
channelsToTreat;
vector<string> keys;
unsigned int commentsCount = 0,
commentsPerSecondCount = 0,
requestsPerChannel = 0;
unsigned short THREADS_NUMBER = 1;
// Can be https://yt.lemnoslife.com to use the official YouTube operational API instance for instance.
string YOUTUBE_OPERATIONAL_API_INSTANCE_URL = "http://localhost/YouTube-operational-API";
bool USE_YT_LEMNOSLIFE_COM_NO_KEY_SERVICE = false;
// Constants written as `string` variables instead of macros to have `string` properties, even if could use a meta-macro inlining as `string`s.
string CHANNELS_DIRECTORY = "channels/",
STARTING_CHANNELS_SET_FILE_PATH = "channels.txt",
YOUTUBE_DATA_API_V3_KEYS_FILE_PATH = "keys.txt",
UNLISTED_VIDEOS_FILE_PATH = "unlistedVideos.txt",
CAPTIONS_DIRECTORY = "captions/",
DEBUG_DIRECTORY = "debug/",
YOUTUBE_APIS_REQUESTS_DIRECTORY = "requests/";
// The keys usage is identical to the YouTube operational API no-key service that is about using completely the daily quota of the first key before using the next one and so on by looping when reached the end of the ordered keys set.
string currentYouTubeDataAPIv3Key = "", // Will firstly be filled with `YOUTUBE_DATA_API_V3_KEYS_FILE_PATH` first line.
CURRENT_WORKING_DIRECTORY;
CHANNELS_FILE_PATH = "channels.txt",
KEYS_FILE_PATH = "keys.txt",
apiKey = ""; // Will firstly be filled with `KEYS_FILE_PATH` first line.
bool USE_YT_LEMNOSLIFE_COM_NO_KEY_SERVICE = false;
int main(int argc, char *argv[])
{
// Proceed passed command line arguments.
for(unsigned short argvIndex = 1; argvIndex < argc; argvIndex++)
{
string argvStr = string(argv[argvIndex]);
@ -103,88 +64,49 @@ int main(int argc, char *argv[])
}
else if(argvStr == "-h" || argvStr == "--help")
{
MAIN_PRINT("Usage: " << argv[0] << " [--help/-h] [--no-keys] [--threads=N] [--youtube-operational-api-instance-url URL]")
exit(EXIT_SUCCESS);
}
// Contrarily to `--threads=` the separator between the command line argument label and value is a space and not an equal sign.
else if(argvStr == "--youtube-operational-api-instance-url")
{
if(argvIndex < argc - 1)
{
YOUTUBE_OPERATIONAL_API_INSTANCE_URL = string(argv[argvIndex + 1]);
argvIndex++;
MAIN_PRINT("Usage: " << argv[0] << " [--help/-h] [--no-keys] [--threads=N]")
exit(0);
}
else
{
MAIN_EXIT_WITH_ERROR("YouTube operational API instance URL missing!")
}
}
else
{
MAIN_EXIT_WITH_ERROR("Unrecognized parameter " << argvStr)
MAIN_PRINT("Unrecognized parameter " << argvStr)
exit(1);
}
}
// The starting set should be written to `STARTING_CHANNELS_SET_FILE_PATH`.
// The starting set should be written to `CHANNELS_FILE_PATH`.
// To resume this algorithm after a shutdown, just restart it after having deleted the last channel folders in `CHANNELS_DIRECTORY` being treated.
// On a restart, `STARTING_CHANNELS_SET_FILE_PATH` is read and every channel not found in `CHANNELS_DIRECTORY` is added to `channelsToTreat*` or `channelsToTreat*` otherwise before continuing, as if `STARTING_CHANNELS_SET_FILE_PATH` was containing a **treated** starting set.
vector<string> startingChannelsSet = getFileContent(STARTING_CHANNELS_SET_FILE_PATH);
for(unsigned int startingChannelsSetIndex = 0; startingChannelsSetIndex < startingChannelsSet.size(); startingChannelsSetIndex++)
{
string startingChannel = startingChannelsSet[startingChannelsSetIndex];
channelsToTreat[startingChannelsSetIndex] = startingChannel;
channelsToTreatRev[startingChannel] = startingChannelsSetIndex;
}
// On a restart, `CHANNELS_FILE_PATH` is read and every channel not found in `CHANNELS_DIRECTORY` is added to `channelsToTreat` or `channelsToTreat` otherwise before continuing, as if `CHANNELS_FILE_PATH` was containing a **treated** starting set.
vector<string> channelsVec = getFileContent(CHANNELS_FILE_PATH);
// Note that using `set`s makes the search faster but we lose the `channels.txt` lines order.
channelsToTreat = setFromVector(channelsVec);
// Load the YouTube Data API v3 keys stored in `YOUTUBE_DATA_API_V3_KEYS_FILE_PATH`.
youtubeDataApiV3keys = getFileContent(YOUTUBE_DATA_API_V3_KEYS_FILE_PATH);
currentYouTubeDataAPIv3Key = youtubeDataApiV3keys[0];
keys = getFileContent(KEYS_FILE_PATH);
apiKey = keys[0];
createDirectory(CHANNELS_DIRECTORY);
// Remove already treated channels from channels to treat.
for(const auto& entry : filesystem::directory_iterator(CHANNELS_DIRECTORY))
{
string fileName = entry.path().filename();
// Skip files such as `UNLISTED_VIDEOS_FILE_PATH`.
if (fileName.substr(0, 2) == "UC") {
string channelId = fileName.substr(0, fileName.length() - 4);
channelsToTreat.erase(channelsToTreatRev[channelId]);
channelsToTreatRev.erase(channelId);
string fileName = entry.path().filename(),
channelId = fileName.substr(0, fileName.length() - 4);
channelsToTreat.erase(channelId);
channelsAlreadyTreated.insert(channelId);
}
}
// Load at runtime the current working directory.
char cwd[PATH_MAX];
if (getcwd(cwd, sizeof(cwd)) != NULL) {
CURRENT_WORKING_DIRECTORY = string(cwd) + "/";
} else {
MAIN_EXIT_WITH_ERROR("`getcwd()` error");
}
// Print the number of:
// - channels to treat
// - channels already treated
MAIN_PRINT(channelsToTreat.size() << " channel(s) to treat")
MAIN_PRINT(channelsAlreadyTreated.size() << " channel(s) already treated")
// Start the `THREADS_NUMBER` threads.
// Note that there is an additional thread that is the one the `main` function that will continue the code below this `for` loop.
vector<thread> threads;
for(unsigned short threadsIndex = 0; threadsIndex < THREADS_NUMBER; threadsIndex++)
{
threads.push_back(thread(treatChannels, threadsIndex + 1));
}
// Every second print the number of channels found during the last second.
// Note that if a same channel is found multiple times, the count will be incremented the same number of times.
while(true)
{
MAIN_PRINT("Channels treated per second: " << channelsFoundPerSecondCount)
channelsFoundPerSecondCount = 0;
MAIN_PRINT("Comments per second: " << commentsPerSecondCount)
commentsPerSecondCount = 0;
sleep(1);
}
@ -197,83 +119,60 @@ int main(int argc, char *argv[])
return 0;
}
// Function each thread loop in until the whole YouTube graph is completely treated.
void treatChannels(unsigned short threadId)
{
// For the moment we assume that we never have treated completely YouTube, otherwise we have to pay attention how to proceed if the starting set involves startvation for some threads.
while(true)
{
// As we're about to mark a channel as being treated, we need to make sure that no other thread is also modifying the set of channels we are working on.
channelsAlreadyTreatedAndToTreatMutex.lock();
if(channelsToTreat.empty())
{
channelsAlreadyTreatedAndToTreatMutex.unlock();
// Consumer thread waiting producer one to provide a channel to work on.
sleep(1);
continue;
}
// Treat channels in the order we found them in `STARTING_CHANNELS_SET_FILE_PATH` or discovered them.
string channelToTreat = channelsToTreat.begin()->second;
string channelToTreat = *channelsToTreat.begin();
// Print the channel id the thread is going to work on and remind the number of channels already treated and the number of channels to treat.
PRINT("Treating channel " << channelToTreat << " (treated: " << channelsAlreadyTreated.size() << ", to treat: " << channelsToTreat.size() << ")")
channelsTreatedCountThreads[threadId] = 0;
requestsCountThreads[threadId] = 0;
channelsToTreat.erase(channelsToTreatRev[channelToTreat]);
channelsToTreatRev.erase(channelToTreat);
PRINT(threadId, "Treating channel " << channelToTreat << " (treated: " << channelsAlreadyTreated.size() << ", to treat: " << channelsToTreat.size() << ")")
channelsToTreat.erase(channelToTreat);
channelsAlreadyTreated.insert(channelToTreat);
channelsAlreadyTreatedAndToTreatMutex.unlock();
// Create directories in which we are going to store the requests to YouTube we did.
string channelToTreatDirectory = CHANNELS_DIRECTORY + channelToTreat + "/";
createDirectory(channelToTreatDirectory);
createDirectory(DEBUG_DIRECTORY);
createDirectory(channelToTreatDirectory + CAPTIONS_DIRECTORY);
createDirectory(channelToTreatDirectory + YOUTUBE_APIS_REQUESTS_DIRECTORY);
// Actually treat the given channel.
treatChannelOrVideo(threadId, true, channelToTreat, channelToTreat);
// Note that compressing the French most subscribers channel took 4 minutes and 42 seconds.
PRINT("Starting compression...")
PRINT(threadId, "Starting compression...")
// As I haven't found any well-known library that compress easily a directory, I have chosen to rely on `zip` cli.
// We precise no `debug`ging, as otherwise the zipping operation doesn't work as expected.
// As the zipping process isn't recursive, we can't just rely on `ls`, but we are obliged to use `find`.
execute(threadId, "cd " + escapeShellArgument(channelToTreatDirectory) + " && find | zip " + escapeShellArgument("../" + channelToTreat + ".zip") + " -@");
exec("cd " + channelToTreatDirectory + " && ls | zip ../" + channelToTreat + ".zip -@");
PRINT("Compression finished, started deleting initial directory...")
// Get rid of the uncompressed data.
PRINT(threadId, "Compression finished, started deleting initial directory...")
deleteDirectory(channelToTreatDirectory);
PRINT("Deleting directory finished.")
PRINT(threadId, "Deleting directory finished.")
PRINT(channelsTreatedCountThreads[threadId] << " channels were found for this channel.")
PRINT(threadId, commentsCount << " comments were found for this channel.")
commentsCount = 0;
requestsPerChannel = 0;
}
// This `unlock` seems to be dead code currently as the algorithm doesn't support treating the whole YouTube graph.
channelsAlreadyTreatedAndToTreatMutex.unlock();
}
// Have to pay attention not to recursively call this function with another channel otherwise we break the ability of the program to halt at any top level channel.
// Note that the `id` can be a channel id or a video id. We provide anyway `channelToTreat` even if it's identical to `id`.
void treatChannelOrVideo(unsigned short threadId, bool isIdAChannelId, string id, string channelToTreat)
void treatChannelOrVideo(unsigned short threadId, bool isChannel, string id, string channelToTreat)
{
string pageToken = "";
// Treat all comments:
// - of a given channel thanks to YouTube Data API v3 CommentThreads: list endpoint and `allThreadsRelatedToChannelId` filter if the provided `id` is a channel id
// - of a given video thanks to YouTube Data API v3 CommentThreads: list endpoint and `videoId` filter otherwise (if the provided `id` is a video id)
while(true)
{
ostringstream toString;
toString << "commentThreads?part=snippet,replies&" << (isIdAChannelId ? "allThreadsRelatedToChannelId" : "videoId") << "=" << id << "&maxResults=100&pageToken=" << pageToken;
toString << "commentThreads?part=snippet,replies&" << (isChannel ? "allThreadsRelatedToChannelId" : "videoId") << "=" << id << "&maxResults=100&pageToken=" << pageToken;
string url = toString.str();
json data = getJson(threadId, url, true, channelToTreat, pageToken == "" ? normal : retryOnCommentsDisabled);
// This condition doesn't hold for not existing channels.
bool doesRelyingOnCommentThreadsIsEnough = (!isIdAChannelId) || data["error"]["errors"][0]["reason"] != "commentsDisabled";
json data = getJson(threadId, url, channelToTreat, pageToken == "" ? normal : retryOnCommentsDisabled);
bool doesRelyingOnCommentThreadsIsEnough = (!isChannel) || data["error"]["errors"][0]["reason"] != "commentsDisabled";
if(doesRelyingOnCommentThreadsIsEnough)
{
json items = data["items"];
@ -284,14 +183,12 @@ void treatChannelOrVideo(unsigned short threadId, bool isIdAChannelId, string id
treatComment(threadId, comment, channelToTreat);
if(item.contains("replies"))
{
// If there is more than 5 replies, they need to be requested by using pagination with YouTube Data API v3 Comments: list endpoint.
// In such case we delay the treatment of the retrieved 5 first replies in order to double treat them.
if(item["snippet"]["totalReplyCount"] > 5)
{
string pageToken = "";
while(true)
{
json data = getJson(threadId, "comments?part=snippet&parentId=" + commentId + "&maxResults=100&pageToken=" + pageToken, true, channelToTreat),
json data = getJson(threadId, "comments?part=snippet&parentId=" + commentId + "&maxResults=100&pageToken=" + pageToken, channelToTreat),
items = data["items"];
for(const auto& item : items)
{
@ -328,20 +225,12 @@ void treatChannelOrVideo(unsigned short threadId, bool isIdAChannelId, string id
}
else
{
PRINT("Comments disabled channel, treating differently...")
// As far as I know we can't retrieve all videos of a channel if it has more than 20,000 videos, in such case the program stops to investigate this further.
json data = getJson(threadId, "channels?part=statistics&id=" + channelToTreat, true, channelToTreat),
items = data["items"];
if(items.empty())
{
PRINT("The provided channel doesn't exist, skipping it.");
break;
}
PRINT(threadId, "Comments disabled channel, treating differently...")
json data = getJson(threadId, "channels?part=statistics&id=" + channelToTreat, channelToTreat);
// YouTube Data API v3 Videos: list endpoint returns `videoCount` as a string and not an integer...
unsigned int videoCount = atoi(string(items[0]["statistics"]["videoCount"]).c_str());
PRINT("The channel has about " << videoCount << " videos.")
unsigned int videoCount = atoi(string(data["items"][0]["statistics"]["videoCount"]).c_str());
PRINT(threadId, "The channel has about " << videoCount << " videos.")
// `UC-3A9g4U1PpLaeAuD4jSP_w` has a `videoCount` of 2, while its `uploads` playlist contains 3 videos. So we use a strict inequality here.
// The `0 < videoCount` is an optimization to avoid making a request to YouTube Data API v3 PlaylistItems: list endpoint while we already know that no results will be returned. As many YouTube channels don't have videos, this optimization is implemented.
if(0 < videoCount && videoCount < 20000)
{
string playlistToTreat = "UU" + channelToTreat.substr(2),
@ -349,17 +238,17 @@ void treatChannelOrVideo(unsigned short threadId, bool isIdAChannelId, string id
while(true)
{
// `snippet` and `status` are unneeded `part`s here but may be interesting later, as we log them.
json data = getJson(threadId, "playlistItems?part=contentDetails,snippet,status&playlistId=" + playlistToTreat + "&maxResults=50&pageToken=" + pageToken, true, channelToTreat, returnErrorIfPlaylistNotFound);
json data = getJson(threadId, "playlistItems?part=snippet,contentDetails,status&playlistId=" + playlistToTreat + "&maxResults=50&pageToken=" + pageToken, channelToTreat, returnErrorIfPlaylistNotFound);
if(data.contains("error"))
{
// This is a sanity check that hasn't ever been violated.
EXIT_WITH_ERROR("Not listing comments on videos, as `playlistItems` hasn't found the `uploads` playlist!")
PRINT(threadId, "Not listing comments on videos, as `playlistItems` hasn't found the `uploads` playlist!")
exit(1);
}
json items = data["items"];
for(const auto& item : items)
{
string videoId = item["contentDetails"]["videoId"];
// To keep the same amount of logs for each regular channel, I comment the following `PRINT`.
// To keep the same amount of logs for each channel, I comment the following `PRINT`.
//PRINT("Treating video " << videoId)
treatChannelOrVideo(threadId, false, videoId, channelToTreat);
}
@ -376,358 +265,18 @@ void treatChannelOrVideo(unsigned short threadId, bool isIdAChannelId, string id
}
else if(videoCount == 0)
{
PRINT("Skip listing comments on videos, as they shouldn't be any according to `channels?part=statistics`.")
PRINT(threadId, "Skip listing comments on videos, as they shouldn't be any according to `channels?part=statistics`.")
break;
}
else //if(videoCount >= 20000)
{
EXIT_WITH_ERROR("The videos count of the channel exceeds the supported 20,000 limit!")
}
}
}
// If the provided `id` is a channel id, then we treat its tabs.
if(isIdAChannelId)
{
// Treat the `CHANNELS` tab.
string pageToken = "";
while(true)
{
json data = getJson(threadId, "channels?part=channels&id=" + id + (pageToken == "" ? "" : "&pageToken=" + pageToken), false, id),
// There is no need to verify that the channel exists as it does thanks to previous comments listing.
channelSections = data["items"][0]["channelSections"];
// We don't mind about channel sections, we are only looking for channel ids.
for(const auto& channelSection : channelSections)
{
for(const auto& sectionChannel : channelSection["sectionChannels"])
{
string channelId = sectionChannel["channelId"];
markChannelAsRequiringTreatmentIfNeeded(threadId, channelId);
}
}
// There is a pagination mechanism only when there is a single channel section.
if(channelSections.size() == 1)
{
json channelSection = channelSections[0];
if(!channelSection["nextPageToken"].is_null())
{
pageToken = channelSection["nextPageToken"];
}
else
{
break;
}
}
else
{
break;
}
}
// Treat the `COMMUNITY` tab.
pageToken = "";
while(true)
{
// First we retrieve community post ids then we retrieve their comments and their replies.
json data = getJson(threadId, "channels?part=community&id=" + id + (pageToken == "" ? "" : "&pageToken=" + pageToken), false, id);
data = data["items"][0];
json posts = data["community"];
for(const auto& post : posts)
{
string postId = post["id"];
// As livestreams chats, comments can be filtered as `Top comments` and `Newest first`, from my experience `Top comments` hide some comments, so we use time filtering everywhere it is possible.
json data = getJson(threadId, "community?part=snippet&id=" + postId + "&order=time", false, id);
string pageToken = data["items"][0]["snippet"]["comments"]["nextPageToken"];
while(pageToken != "")
{
json data = getJson(threadId, "commentThreads?part=snippet,replies&pageToken=" + pageToken, false, id),
items = data["items"];
for(const auto& item : items)
{
json snippet = item["snippet"]["topLevelComment"]["snippet"],
authorChannelId = snippet["authorChannelId"];
if(!authorChannelId["value"].is_null())
{
string channelId = authorChannelId["value"];
markChannelAsRequiringTreatmentIfNeeded(threadId, channelId);
}
// Contrarily to YouTube Data API v3 for a given comments having replies, we don't switch from CommentThreads: list endpoint to Comments: list endpoint, here we keep working with YouTube operational API CommentThreads: list endpoint but change the page token.
string pageToken = snippet["nextPageToken"];
while(pageToken != "")
{
json data = getJson(threadId, "commentThreads?part=snippet,replies&pageToken=" + pageToken, false, id),
items = data["items"];
for(const auto& item : items)
{
string channelId = item["snippet"]["authorChannelId"]["value"];
markChannelAsRequiringTreatmentIfNeeded(threadId, channelId);
}
if(data.contains("nextPageToken"))
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
}
if(data.contains("nextPageToken"))
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
}
// See https://github.com/Benjamin-Loison/YouTube-operational-API/issues/49
if(data.contains("nextPageToken") && data["nextPageToken"] != "")
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
// Treat the `PLAYLISTS` tab.
pageToken = "";
while(true)
{
json data = getJson(threadId, "channels?part=playlists&id=" + id + (pageToken == "" ? "" : "&pageToken=" + pageToken), false, id),
playlistSections = data["items"][0]["playlistSections"];
// We don't mind about playlist sections, we are only looking for channel ids.
for(const auto& playlistSection : playlistSections)
{
for(const auto& playlist : playlistSection["playlists"])
{
string playlistId = playlist["id"];
// We exclude shows as they at least for free don't contain any comment indirectly.
if(playlistId.substr(0, 2) == "SC")
{
continue;
}
//PRINT(threadId, playlistId)
string pageToken = "";
while(true)
{
json data = getJson(threadId, "playlistItems?part=contentDetails,snippet,status&playlistId=" + playlistId + "&maxResults=50&pageToken=" + pageToken, true, id),
items = data["items"];
for(const auto& item : items)
{
json snippet = item["snippet"];
// This section is bit out of the scope of the YouTube captions search engine goal, as we are just curious about unlisted videos that we found but in fact it's also a bit in the scope of the initial goal, as this enable us to treat unlisted content.
string privacyStatus = item["status"]["privacyStatus"];
// `5-CXVU8si3A` in `PLTYUE9O6WCrjQsnOm56rMMNmFy_A-SjUx` has its privacy status on `privacyStatusUnspecified` and is inaccessible.
// `GMiVi8xkEXA` in `PLTYUE9O6WCrgNpeSiryP8LYVX-7tOJ1f1` has its privacy status on `private`.
// Of course `commentThreads?videoId=` doesn't work for these videos (same result on YouTube UI).
// By hypothesis that the discovery algorithm never ends we can't postpone the treatment of these unlisted videos, because we can find such unlisted videos at any point in time (before or after the given channel treatment).
// Maybe modifying this hypothesis would make sense, otherwise we have to treat them right-away (note that except code architecture, there is no recursion problem as documented on this function).
if(privacyStatus != "public" && privacyStatus != "private" && snippet["title"] != "Deleted video")
{
string videoId = snippet["resourceId"]["videoId"],
channelId = snippet["videoOwnerChannelId"];
PRINT("Found non public video (" << videoId << ") in: " << playlistId)
string channelUnlistedVideosFilePath = CHANNELS_DIRECTORY + UNLISTED_VIDEOS_FILE_PATH;
bool doesChannelUnlistedVideosFileExist = doesFileExist(channelUnlistedVideosFilePath);
writeFile(threadId, channelUnlistedVideosFilePath, !doesChannelUnlistedVideosFileExist ? "w" : "a", (!doesChannelUnlistedVideosFileExist ? "" : "\n") + channelId);
}
if(snippet.contains("videoOwnerChannelId"))
{
// There isn't any `videoOwnerChannelId` to retrieve for `5-CXVU8si3A` for instance.
string channelId = snippet["videoOwnerChannelId"];
// As we are already treating the given channel, verifying if it needs to be treated again is only a loss of time, so we skip the verification in this case.
if(channelId != id)
{
markChannelAsRequiringTreatmentIfNeeded(threadId, channelId);
}
}
}
if(data.contains("nextPageToken"))
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
}
}
if(!data["nextPageToken"].is_null())
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
// Treat the `LIVE` tab.
pageToken = "";
string playlistId = "UU" + id.substr(2);
vector<string> videoIds;
while(true)
{
// We verify by batch of 50 videos, if they are livestreams or not thanks to YouTube Data API v3 PlaylistItems: list and Videos: list endpoints, as PlaylistItems: list endpoint doesn't provide on its own whether or not the given video is a livestream.
json data = getJson(threadId, "playlistItems?part=contentDetails,snippet,status&playlistId=" + playlistId + "&maxResults=50&pageToken=" + pageToken, true, id, returnErrorIfPlaylistNotFound),
items = data["items"];
for(const auto& item : items)
{
string videoId = item["snippet"]["resourceId"]["videoId"];
videoIds.push_back(videoId);
}
bool hasNextPageToken = data.contains("nextPageToken");
if(videoIds.size() == 50 || !hasNextPageToken)
{
json data = getJson(threadId, "videos?part=contentDetails,id,liveStreamingDetails,localizations,player,snippet,statistics,status,topicDetails&id=" + join(videoIds, ","), true, id),
items = data["items"];
for(const auto& item : items)
{
if(item.contains("liveStreamingDetails"))
{
string videoId = item["id"];
//PRINT(videoId)
json liveStreamingDetails = item["liveStreamingDetails"];
// There is two possibilities for a live stream, whether it's ended or not.
// If it's ended we can't anymore use YouTube Live Streaming API LiveChat/messages: list endpoint.
if(liveStreamingDetails.contains("activeLiveChatId"))
{
string activeLiveChatId = liveStreamingDetails["activeLiveChatId"];
json data = getJson(threadId, "liveChat/messages?part=snippet,authorDetails&liveChatId=" + activeLiveChatId, true, id),
items = data["items"];
for(const auto& item : items)
{
string channelId = item["snippet"]["authorChannelId"];
markChannelAsRequiringTreatmentIfNeeded(threadId, channelId);
}
}
else
{
// As there isn't the usual pagination mechanism for these ended livestreams, we proceed in an uncertain way as follows based on a time pagination.
set<string> messageIds;
unsigned long long lastMessageTimestampRelativeMsec = 0;
while(true)
{
string time = to_string(lastMessageTimestampRelativeMsec);
json data = getJson(threadId, "liveChats?part=snippet&id=" + videoId + "&time=" + time, false, id),
snippet = data["items"][0]["snippet"];
if(snippet.empty())
{
break;
}
json firstMessage = snippet[0];
string firstMessageId = firstMessage["id"];
// We verify that we don't skip any message by verifying that the first message was already treated if we already treated some messages.
if(!messageIds.empty() && messageIds.find(firstMessageId) == messageIds.end())
{
// This sometimes happen cf https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine/issues/39.
PRINT("The verification that we don't skip any message failed! Continuing anyway...")
}
for(const auto& message : snippet)
{
string messageId = message["id"];
if(messageIds.find(messageId) == messageIds.end())
{
messageIds.insert(messageId);
string channelId = message["authorChannelId"];
markChannelAsRequiringTreatmentIfNeeded(threadId, channelId);
}
}
json lastMessage = snippet.back();
// If there isn't any new message, then we stop the retrieving.
if(lastMessageTimestampRelativeMsec == lastMessage["videoOffsetTimeMsec"])
{
break;
}
lastMessageTimestampRelativeMsec = lastMessage["videoOffsetTimeMsec"];
}
}
}
}
videoIds.clear();
}
if(hasNextPageToken)
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
// Captions retrieval by relying on `yt-dlp` after having listed all videos ids of the given channel.
string playlistToTreat = "UU" + channelToTreat.substr(2);
pageToken = "";
while(true)
{
json data = getJson(threadId, "playlistItems?part=snippet,contentDetails,status&playlistId=" + playlistToTreat + "&maxResults=50&pageToken=" + pageToken, true, channelToTreat, returnErrorIfPlaylistNotFound);
if(data.contains("error"))
{
// `UCFoBM1VginhMH7lR56GtVbQ` doesn't have videos and is in this case for instance.
PRINT("Not listing captions on videos, as `playlistItems` hasn't found the `uploads` playlist!")
break;
}
json items = data["items"];
for(const auto& item : items)
{
string videoId = item["contentDetails"]["videoId"];
// Could proceed as follows by verifying `!isIdAChannelId` but as we don't know how to manage unlisted videos, we don't proceed this way.
//treatChannelOrVideo(threadId, false, videoId, channelToTreat);
string channelCaptionsToTreatDirectory = CHANNELS_DIRECTORY + channelToTreat + "/" + CAPTIONS_DIRECTORY + videoId + "/";
createDirectory(channelCaptionsToTreatDirectory);
// Firstly download all not automatically generated captions.
// The underscore in `-o` argument is used to not end up with hidden files.
// We are obliged to precise the video id after `--`, otherwise if the video id starts with `-` it's considered as an argument.
string commandCommonPrefix = "yt-dlp --skip-download ",
commandCommonPostfix = " -o " + escapeShellArgument(channelCaptionsToTreatDirectory + "_") + " -- " + escapeShellArgument(videoId);
string command = commandCommonPrefix + "--write-sub --sub-lang all,-live_chat" + commandCommonPostfix;
execute(threadId, command);
// Secondly download the automatically generated captions.
command = commandCommonPrefix + "--write-auto-subs --sub-langs '.*orig' --sub-format ttml --convert-subs vtt" + commandCommonPostfix;
execute(threadId, command);
}
if(data.contains("nextPageToken"))
{
pageToken = data["nextPageToken"];
}
else
{
break;
PRINT(threadId, "The videos count of the channel exceeds the supported 20,000 limit!")
exit(1);
}
}
}
}
// This function verifies that the given channel hasn't already been treated.
void markChannelAsRequiringTreatmentIfNeeded(unsigned short threadId, string channelId)
{
channelsFoundPerSecondCount++;
channelsTreatedCountThreads[threadId]++;
// As other threads may be writing the sets we are reading, we need to make sure it's not the case to ensure consistency.
channelsAlreadyTreatedAndToTreatMutex.lock();
if(channelsAlreadyTreated.find(channelId) == channelsAlreadyTreated.end() && channelsToTreatRev.find(channelId) == channelsToTreatRev.end())
{
// It is unclear to me why `channelsToTreat.end()->first + 1` doesn't work here.
unsigned int channelsToTreatIndex = !channelsToTreat.empty() ? channelsToTreat.rbegin()->first + 1 : channelsAlreadyTreated.size();
channelsToTreat[channelsToTreatIndex] = channelId;
channelsToTreatRev[channelId] = channelsToTreatIndex;
channelsAlreadyTreatedAndToTreatMutex.unlock();
writeFile(threadId, STARTING_CHANNELS_SET_FILE_PATH, "a", "\n" + channelId);
}
else
{
channelsAlreadyTreatedAndToTreatMutex.unlock();
}
}
// Mark the comment author channel as requiring treatment if needed.
void treatComment(unsigned short threadId, json comment, string channelId)
{
json snippet = comment["snippet"];
@ -735,49 +284,39 @@ void treatComment(unsigned short threadId, json comment, string channelId)
if(snippet.contains("authorChannelId"))
{
string channelId = snippet["authorChannelId"]["value"];
markChannelAsRequiringTreatmentIfNeeded(threadId, channelId);
channelsAlreadyTreatedAndToTreatMutex.lock();
if(channelsAlreadyTreated.find(channelId) == channelsAlreadyTreated.end() && channelsToTreat.find(channelId) == channelsToTreat.end())
{
channelsToTreat.insert(channelId);
channelsAlreadyTreatedAndToTreatMutex.unlock();
writeFile(threadId, CHANNELS_FILE_PATH, "a", "\n" + channelId);
}
else
{
channelsAlreadyTreatedAndToTreatMutex.unlock();
}
}
commentsCount++;
commentsPerSecondCount++;
}
// Join `parts` with the `delimiter`.
string join(vector<string> parts, string delimiter)
string exec(string cmd)
{
string result = "";
unsigned int partsSize = parts.size();
for(unsigned int partsIndex = 0; partsIndex < partsSize; partsIndex++)
array<char, 128> buffer;
string result;
unique_ptr<FILE, decltype(&pclose)> pipe(popen(cmd.c_str(), "r"), pclose);
if (!pipe)
{
result += parts[partsIndex];
if(partsIndex < partsSize - 1)
{
result += delimiter;
throw runtime_error("popen() failed!");
}
while (fgets(buffer.data(), buffer.size(), pipe.get()) != nullptr)
{
result += buffer.data();
}
return result;
}
// Execute a provide command as if being ran in a shell.
// This is mandatory as as far as I know there isn't a C++ API for `yt-dlp`.
void execute(unsigned short threadId, string command, bool debug)
{
// The debugging gives us confidence that `yt-dlp` is working as expected, cf https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine/issues/35#issuecomment-578.
if(debug)
{
ostringstream toString;
toString << threadId;
string initialCommand = command,
threadIdStr = toString.str(),
debugCommonFilePath = CURRENT_WORKING_DIRECTORY + DEBUG_DIRECTORY + threadIdStr,
debugOutFilePath = debugCommonFilePath + ".out",
debugErrFilePath = debugCommonFilePath + ".err";
command += " >> " + debugOutFilePath;
command += " 2>> " + debugErrFilePath;
writeFile(threadId, debugOutFilePath, "a", initialCommand + "\n");
writeFile(threadId, debugErrFilePath, "a", initialCommand + "\n");
}
system(command.c_str());
}
bool writeFile(unsigned short threadId, string filePath, string option, string toWrite)
{
FILE* file = fopen(filePath.c_str(), option.c_str());
@ -789,7 +328,7 @@ bool writeFile(unsigned short threadId, string filePath, string option, string t
}
else
{
PRINT("writeFile error: " << strerror(errno))
PRINT(threadId, "writeFile error: " << strerror(errno))
}
return false;
}
@ -800,20 +339,16 @@ bool doesFileExist(string filePath)
return stat(filePath.c_str(), &buffer) == 0;
}
// Create a directory in the case that it isn't already existing.
void createDirectory(string path)
{
mkdir(path.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
}
// Delete a directory even if it's not empty.
void deleteDirectory(string path)
{
filesystem::remove_all(path);
}
// Get date in `%d-%m-%Y %H-%M-%S.%MS` format.
// Return for instance `22-02-2023 00-43-24.602`.
string getDate()
{
auto t = time(nullptr);
@ -827,7 +362,11 @@ string getDate()
return toString.str();
}
// Return file lines as a vector of the file at the given `filePath`.
set<string> setFromVector(vector<string> vec)
{
return set(vec.begin(), vec.end());
}
vector<string> getFileContent(string filePath)
{
vector<string> lines;
@ -838,15 +377,10 @@ vector<string> getFileContent(string filePath)
return lines;
}
// Execute and return the result of a given request to a YouTube API.
json getJson(unsigned short threadId, string url, bool usingYoutubeDataApiv3, string channelId, getJsonBehavior behavior)
json getJson(unsigned short threadId, string url, string directoryPath, getJsonBehavior behavior)
{
// If using the YouTube operational API official instance no-key service, we don't need to provide any YouTube Data API v3 key.
string finalUrl = usingYoutubeDataApiv3 ?
(USE_YT_LEMNOSLIFE_COM_NO_KEY_SERVICE ?
"https://yt.lemnoslife.com/noKey/" + url :
"https://www.googleapis.com/youtube/v3/" + url + "&key=" + currentYouTubeDataAPIv3Key) :
YOUTUBE_OPERATIONAL_API_INSTANCE_URL + "/" + url,
string finalUrl = USE_YT_LEMNOSLIFE_COM_NO_KEY_SERVICE ? "https://yt.lemnoslife.com/noKey/" + url :
"https://www.googleapis.com/youtube/v3/" + url + "&key=" + apiKey,
content = getHttps(finalUrl);
json data;
try
@ -855,46 +389,35 @@ json getJson(unsigned short threadId, string url, bool usingYoutubeDataApiv3, st
}
catch (json::parse_error& ex)
{
// From the experience this sometimes happens due to empty `content` but retrying just after solves the problem.
PRINT("Parse error for " << finalUrl << ", as got: " << content << " ! Retrying...")
return getJson(threadId, url, usingYoutubeDataApiv3, channelId);
PRINT(threadId, "Parse error for " << finalUrl << ", as got: " << content << " !")
exit(1);
}
if(data.contains("error"))
{
// The YouTube operational API shouldn't be returning any error, if it's the case we stop the execution to investigate the problem.
if(!usingYoutubeDataApiv3)
{
EXIT_WITH_ERROR("Found error in JSON retrieved from YouTube operational API at URL: " << finalUrl << " for content: " << content << " !")
}
string reason = data["error"]["errors"][0]["reason"];
// Contrarily to YouTube operational API no-key service we don't rotate keys in `YOUTUBE_DATA_API_V3_KEYS_FILE_PATH`, as we keep them in memory here, but we do rotate them in the memory.
// Contrarily to YouTube operational API no-key service we don't rotate keys in `KEYS_FILE_PATH`, as we keep them in memory here.
if(reason == "quotaExceeded")
{
quotaMutex.lock();
// Move the current exhausted YouTube Data API v3 key from the first slot to the last one.
youtubeDataApiV3keys.erase(youtubeDataApiV3keys.begin());
youtubeDataApiV3keys.push_back(currentYouTubeDataAPIv3Key);
PRINT("No more quota on " << currentYouTubeDataAPIv3Key << " switching to " << youtubeDataApiV3keys[0] << ".")
currentYouTubeDataAPIv3Key = youtubeDataApiV3keys[0];
keys.erase(keys.begin());
keys.push_back(apiKey);
PRINT(threadId, "No more quota on " << apiKey << " switching to " << keys[0] << ".")
apiKey = keys[0];
quotaMutex.unlock();
// We proceed again to the request not to return a temporary error due to our keys management.
return getJson(threadId, url, true, channelId);
return getJson(threadId, url, directoryPath);
}
// Errors from YouTube Data API v3 are normal in some cases when we request something that doesn't exist such as comments of a channel on a channel that doesn't have any, but we have to make the request to know that it doesn't have any that's why we proceed this way.
PRINT("Found error in JSON at URL: " << finalUrl << " for content: " << content << " !")
PRINT(threadId, "Found error in JSON at URL: " << finalUrl << " for content: " << content << " !")
if(reason != "commentsDisabled" || behavior == retryOnCommentsDisabled)
{
return reason == "playlistNotFound" && behavior == returnErrorIfPlaylistNotFound ? data : getJson(threadId, url, true, channelId);
return reason == "playlistNotFound" && behavior == returnErrorIfPlaylistNotFound ? data : getJson(threadId, url, directoryPath);
}
}
// Write the request URL and the retrieved content to logs.
ostringstream toString;
toString << CHANNELS_DIRECTORY << channelId << "/" << YOUTUBE_APIS_REQUESTS_DIRECTORY;
writeFile(threadId, toString.str() + "urls.txt", "a", url + " " + (usingYoutubeDataApiv3 ? "true" : "false") + "\n");
toString << requestsCountThreads[threadId]++ << ".json";
writeFile(threadId, toString.str(), "w", content);
toString << CHANNELS_DIRECTORY << directoryPath << "/" << requestsPerChannel << ".json";
requestsPerChannel++;
writeFile(threadId, toString.str(), "w", url + "\n" + content);
return data;
}
@ -910,7 +433,6 @@ void print(ostringstream* toPrint)
}
// Is this function really multi-threading friendly? If not, could consider executing `curl` using the command line.
// Retrieves content from an URL. Note that this function verifies the validity of the certificate in case of HTTPS.
string getHttps(string url)
{
CURL* curl = curl_easy_init();
@ -925,26 +447,8 @@ string getHttps(string url)
return got;
}
// Auxiliary function required by `getHttps` function.
size_t writeCallback(void* contents, size_t size, size_t nmemb, void* userp)
{
((string*)userp)->append((char*)contents, size * nmemb);
return size * nmemb;
}
// Source: https://stackoverflow.com/a/3669819
string escapeShellArgument(string shellArgument)
{
return "'" + replaceAll(shellArgument, "'", "'\\''") + "'";
}
string replaceAll(string str, const string& from, const string& to)
{
size_t start_pos = 0;
while((start_pos = str.find(from, start_pos)) != string::npos)
{
str.replace(start_pos, from.length(), to);
start_pos += to.length(); // Handles case where 'to' is a substring of 'from'
}
return str;
}

View File

@ -14,7 +14,6 @@ with open('nohup.out') as f:
#print(line)
threadId = line.split(': ')[1]
channelId = line.split(infix)[1].split(' (')[0]
if threadId.isdigit() and channelId.startswith('UC') and len(channelId) == 24:
threads[threadId] = channelId
for threadId in threads:
channelId = threads[threadId]

View File

@ -1,7 +1,4 @@
#!/usr/bin/python3
# We can't proceed automatically by using `requests` Python module because https://socialblade.com/youtube/top/country/fr/mostsubscribed is protected by CloudFlare.
# Note that `undetected-chromedriver` might be a workaround this limitation.
with open('mostsubscribed.html') as f:
lines = f.read().splitlines()

View File

@ -1,42 +0,0 @@
<?php
if (!function_exists('str_contains')) {
function str_contains($haystack, $needle)
{
return strpos($haystack, $needle) !== false;
}
}
if (!function_exists('str_ends_with')) {
function str_ends_with($haystack, $needle)
{
$length = strlen($needle);
return $length > 0 ? substr($haystack, -$length) === $needle : true;
}
}
function str_replace_first($needle, $replace, $haystack) {
$pos = strpos($haystack, $needle);
if ($pos !== false) {
$haystack = substr_replace($haystack, $replace, $pos, strlen($needle));
}
return $haystack;
}
$uri = $_SERVER['REQUEST_URI'];
$uri = str_replace('/channels/', '', $uri);
$prefix = '/mnt/HDD0/YouTube_captions_search_engine/channels/';
if (str_contains($uri, '/')) {
$uri = str_replace_first('/', '#', $uri);
$uri = $prefix . $uri;
if (str_ends_with($uri, '.json')) {
header('Content-Type: application/json; charset=UTF-8');
}
echo file_get_contents("zip://$uri");
} else {
$uri = $prefix . $uri;
header("Content-Type: application/zip");
echo readfile($uri);
}
?>

View File

@ -1,5 +0,0 @@
{
"require": {
"cboden/ratchet": "^0.4.4"
}
}

1411
website/composer.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,105 +0,0 @@
<?php
function echoUrl($url)
{
echo "<a href=\"$url\">$url</a>";
}
?>
See <?php echoUrl('https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine'); ?> for more information.<br/>
Access raw data with: <?php echoUrl('channels/'); ?>.<br/>
Access found channels with: <?php echoUrl('channels.txt'); ?>.
<form id="form">
<input type="text" autofocus id="search" size="23" placeholder="Your search regex"></input><br/>
<input type="text" autofocus id="path-search" size="23" placeholder="Your path regex (default: *empty*)"></input><br/>
<input type="submit" id="search" value="Search">
<input type="submit" id="search-only-captions" value="Search only captions">
</form>
Progress: <span id="progress"></span> channels
<ul id="channels">
</ul>
<script>
var firstRun = true;
var conn;
// Could parse DOM instead of using following variable.
var channels = [];
function createA(text, href) {
var a = document.createElement('a');
var text = document.createTextNode(text);
a.appendChild(text);
a.href = href;
return a;
}
function treatLine(line) {
console.log(line);
if (line.startsWith('progress:')) {
document.getElementById('progress').innerHTML = line.replace('progress:', '');
} else {
var channelsDom = document.getElementById('channels');
var timestamp = [];
const lineParts = line.split('|');
if (lineParts.length > 0) {
timestamps = lineParts.slice(1).map(linePart => parseInt(linePart));
line = lineParts[0];
}
const channelFileParts = line.split('/');
const channel = channelFileParts[0];
const channelFile = channelFileParts.slice(1).join('/');
const channelHref = `channels/${channel}`;
if (!channels.includes(channel)) {
channels.push(channel);
channelDom = document.createElement('li');
var a = createA(channel, channelHref);
channelDom.appendChild(a);
var channelFilesDom = document.createElement('ul');
channelDom.appendChild(channelFilesDom);
channelsDom.appendChild(channelDom);
}
var channelDom = channelsDom.lastChild;
var channelFilesDom = channelDom.lastChild;
var channelFileDom = document.createElement('li');
var a = createA(channelFile, `${channelHref}/${channelFile}`);
channelFileDom.appendChild(a);
const id = channelFileParts[2];
for(var timestampsIndex = 0; timestampsIndex < timestamps.length; timestampsIndex++) {
const space = document.createTextNode('\u00A0');
channelFileDom.appendChild(space);
const timestamp = timestamps[timestampsIndex];
var a = createA(`${timestamp} s`, `https://www.youtube.com/watch?v=${id}&t=${timestamp}`);
channelFileDom.appendChild(a);
}
channelFilesDom.appendChild(channelFileDom);
}
}
function search(event) {
// We don't want to refresh the webpage which is the default behavior.
event.preventDefault();
const query = event.submitter.id + ' ' + document.getElementById('path-search').value + ' ' + document.getElementById('search').value;
if (firstRun) {
firstRun = false;
conn = new WebSocket('wss://crawler.yt.lemnoslife.com/websocket');
conn.onmessage = function(e) {
e.data.split('\n').forEach(treatLine);
};
// We can't directly proceed with `conn.send`, as the connection may not be already established.
conn.onopen = function(e) { conn.send(query); };
} else {
// We assume at this point that the connection is established.
channels = [];
document.getElementById('channels').innerHTML = '';
conn.send(query);
}
}
var form = document.getElementById('form');
form.addEventListener('submit', search);
</script>

View File

@ -1,96 +0,0 @@
#!/usr/bin/python3
import sys, time, fcntl, os, zipfile, webvtt, re
from io import StringIO
path = '/mnt/HDD0/YouTube_captions_search_engine/channels/'
clientId = sys.argv[1]
pathSearchMessageParts = sys.argv[2].split(' ')
pathSearch = pathSearchMessageParts[1]
message = ' '.join(pathSearchMessageParts[2:])
pathSearchRegex = re.compile(pathSearch)
messageRegex = re.compile(message)
isPathSearchAChannelId = re.fullmatch(r'[a-zA-Z0-9-_]{24}', pathSearch)
searchOnlyCaptions = pathSearchMessageParts[0] == 'search-only-captions'
clientFilePath = f'users/{clientId}.txt'
def write(s):
with open(clientFilePath, 'r+') as f:
try:
fcntl.flock(f, fcntl.LOCK_EX)
# If the output file is empty, then it means that `websocket.php` read it. Anyway we don't wait it and we append what we want to output.
read = f.read()
# We are appening content, as we moved in-file cursor.
if read != '':
f.write('\n')
f.write(s)
f.flush()
fcntl.flock(f, fcntl.LOCK_UN)
except Exception as e:
sys.exit(e)
def cleanCaption(caption):
return caption.replace('\n', ' ')
# As `zipgrep` doesn't support arguments to stop on first match for each file, we proceed manually to keep a good theoretical complexity.
if isPathSearchAChannelId:
file = pathSearch + '.zip'
if os.path.isfile(path + file):
files = [file]
else:
write(f'progress:0 / 0')
else:
files = [file for file in os.listdir(path) if file.endswith('.zip')]
for fileIndex, file in enumerate(files):
write(f'progress:{fileIndex} / {len(files)}')
zip = zipfile.ZipFile(path + file)
for fileInZip in zip.namelist():
endsWithVtt = fileInZip.endswith('.vtt')
if searchOnlyCaptions and not endsWithVtt:
continue
toWrite = f'{file}/{fileInZip}'
if not bool(pathSearchRegex.search(toWrite)):
continue
with zip.open(fileInZip) as f:
if endsWithVtt:
content = f.read().decode('utf-8')
stringIOf = StringIO(content)
wholeCaption = ' '.join([cleanCaption(caption.text) for caption in webvtt.read_buffer(stringIOf)])
messagePositions = [m.start() for m in messageRegex.finditer(wholeCaption)]
if messagePositions != []:
timestamps = []
for messagePosition in messagePositions:
stringIOf = StringIO(content)
for caption in webvtt.read_buffer(stringIOf):
text = cleanCaption(caption.text)
if messagePosition <= len(text):
timestamp = str(int(caption.start_in_seconds))
timestamps += [timestamp]
break
messagePosition -= len(text) + 1
write(f'{toWrite}|{"|".join(timestamps)}')
else:
for line in f.readlines():
if message in str(line):
write(toWrite)
break
write(f'progress:{fileIndex + 1} / {len(files)}')
with open(clientFilePath) as f:
while True:
try:
fcntl.flock(f, fcntl.LOCK_EX)
if f.read() == '':
os.remove(clientFilePath)
break
else:
fcntl.flock(f, fcntl.LOCK_UN)
time.sleep(1)
except Exception as e:
sys.exit(e)

View File

@ -1,5 +0,0 @@
# Ignore everything in this directory
*
# Except this file
!.gitignore

View File

@ -1,164 +0,0 @@
<?php
use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;
use React\EventLoop\LoopInterface;
use React\EventLoop\Timer\Timer;
// Make sure composer dependencies have been installed
require __DIR__ . '/vendor/autoload.php';
class Client
{
public $id;
public $timer;
public $pid;
public function __construct($id)
{
$this->id = $id;
}
// `__destruct` can't take arguments.
public function free($loop)
{
$loop->cancelTimer($this->timer);
// Should in theory verify that the pid wasn't re-assigned.
posix_kill($this->pid, SIGTERM);
$clientFilePath = getClientFilePath($this->id);
if (file_exists($clientFilePath)) {
$fp = fopen($clientFilePath, 'r+');
if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock
unlink($clientFilePath); // delete file
flock($fp, LOCK_UN); // release the lock
} else {
echo "Couldn't get the lock!";
}
fclose($fp);
}
}
}
// Need to be passed as a reference to `flock`.
$WAIT_IF_LOCKED = 1;
define('USERS_FOLDER', 'users/');
// Delete users outputs of previous `websocket.php` execution.
// We skip `.`, `..` and `.gitignore`.
foreach (array_slice(scandir(USERS_FOLDER), 3) as $file) {
unlink(USERS_FOLDER . $file);
}
function getClientFilePath($clientId)
{
return USERS_FOLDER . "$clientId.txt";
}
// Current implementation may add latency across users.
class MyProcess implements MessageComponentInterface
{
protected $clients;
private $loop;
private $newClientId;
private $newClientIdSem;
public function __construct(LoopInterface $loop)
{
$this->clients = new \SplObjectStorage();
$this->loop = $loop;
$this->newClientId = 0;
$this->newClientIdSem = sem_get(1, 1);
}
private function newClient()
{
// If `onOpen` and `onMessage` can't be called at the same time, then this semaphore is useless.
if (sem_acquire($this->newClientIdSem)) {
// Note that we don't re-use ids except on `websockets.php` restart, but as the maximal int in PHP is a very great number we are fine for a while (https://www.php.net/manual/en/reserved.constants.php#constant.php-int-max)
$clientId = $this->newClientId++;
sem_release($this->newClientIdSem);
return new Client($clientId);
} else {
exit('`newClient` error');
}
}
public function onOpen(ConnectionInterface $conn)
{
$client = $this->newClient();
$this->clients->attach($conn, $client);
}
public function onMessage(ConnectionInterface $from, $msg)
{
$client = $this->clients->offsetGet($from);
// If a previous request was received, we execute the new one with another client for simplicity otherwise with current file deletion approach, we can't tell the worker `search.py` that we don't care about its execution anymore.
if ($client->pid !== null) {
// As `$this->clients->detach` doesn't call `__destruct` for unknown reason, we clean manually the previous request.
$client->free($this->loop);
$client = $this->newClient();
}
$clientId = $client->id;
$clientFilePath = getClientFilePath($clientId);
// Create the worker output file otherwise it would believe that we don't need this worker anymore.
file_put_contents($clientFilePath, '');
// As we are going to use this argument in a shell command, we escape it.
$msg = escapeshellarg($msg);
// Start the independent worker.
// Redirecting `stdout` is mandatory otherwise `exec` is blocking.
$client->pid = exec("./search.py $clientId $msg > /dev/null & echo $!");
// `addTimer` doesn't enable us to use independently `$from->send` multiple times with blocking instructions between.
$client->timer = $this->loop->addPeriodicTimer(1, function () use ($from, $clientId, $clientFilePath, $client) {
echo "Checking news from $clientId\n";
// If the worker output file doesn't exist anymore, then it means that the worker have finished its work and acknowledged that `websocket.php` completely read its output.
if (file_exists($clientFilePath)) {
// `flock` requires `r`eading permission and we need `w`riting one due to `ftruncate` usage.
$fp = fopen($clientFilePath, 'r+');
$read = null;
if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock
// We assume that the temporary output is less than 1 MB long.
$read = fread($fp, 1_000_000);
ftruncate($fp, 0); // truncate file
fflush($fp); // flush output before releasing the lock
flock($fp, LOCK_UN); // release the lock
} else {
// We `die` instead of `echo`ing to force the developer to investigate the reason.
die("Couldn't get the lock!");
}
fclose($fp);
// Assume that empty output doesn't need to me forwarded to the end-user.
if ($read !== null && $read !== '') {
$from->send($read);
}
} else {
// We don't need the periodic timer anymore, as the worker finished its work and acknowledged that `websocket.php` completely read its output.
$this->loop->cancelTimer($client->timer);
}
});
}
public function onClose(ConnectionInterface $conn)
{
$client = $this->clients->offsetGet($conn);
$clientId = $client->id;
$client->free($this->loop);
echo "$clientId disconnected\n";
$this->clients->detach($conn);
}
public function onError(ConnectionInterface $conn, \Exception $e)
{
$conn->close();
die('`onError`');
}
}
$loop = \React\EventLoop\Factory::create();
// Run the server application through the WebSocket protocol on port 4430.
// Note that named arguments come with PHP 8 which isn't current Debian one.
$app = new Ratchet\App('crawler.yt.lemnoslife.com', 4430, '127.0.0.1', $loop);
$app->route('/websocket', new MyProcess($loop), array('*'));
$app->run();