Fix #7: Add multi-threading

This commit is contained in:
Benjamin Loison 2023-01-03 04:56:19 +01:00
parent ad3e90fe92
commit 2a33be9272

125
main.cpp
View File

@ -2,30 +2,36 @@
#include <fstream> #include <fstream>
#include <sstream> #include <sstream>
#include <set> #include <set>
#include <mutex>
#include <thread>
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h>
#include <curl/curl.h> #include <curl/curl.h>
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
using namespace std; using namespace std;
using json = nlohmann::json; using json = nlohmann::json;
vector<string> getFileContent(string filePath); vector<string> getFileContent(string filePath);
json getJson(string url, string directoryPath); json getJson(unsigned short threadId, string url, string directoryPath);
void createDirectory(string path), void createDirectory(string path),
print(ostringstream* toPrint), print(ostringstream* toPrint),
treatComment(json comment, string channelId), treatComment(unsigned short threadId, json comment, string channelId),
treatChannelOrVideo(bool isChannel, string id, string channelToTreat); treatChannelOrVideo(unsigned short threadId, bool isChannel, string id, string channelToTreat),
treatChannels(unsigned short threadId);
string getHttps(string url); string getHttps(string url);
size_t writeCallback(void* contents, size_t size, size_t nmemb, void* userp); size_t writeCallback(void* contents, size_t size, size_t nmemb, void* userp);
bool doesFileExist(string filePath), bool doesFileExist(string filePath),
writeFile(string filePath, string option, string toWrite); writeFile(unsigned short threadId, string filePath, string option, string toWrite);
#define USE_YT_LEMNOSLIFE_COM_NO_KEY_SERVICE #define USE_YT_LEMNOSLIFE_COM_NO_KEY_SERVICE
#define API_KEY "AIzaSy..." #define API_KEY "AIzaSy..."
#define THREADS_NUMBER 10
// Note that this printing approach is only safe in a mono-thread context. #define PRINT(threadId, x) { ostringstream toPrint; toPrint << threadId << ": " << x; print(&toPrint); }
#define PRINT(x) toPrint << x; print(&toPrint); #define DEFAULT_THREAD_ID 0
ostringstream toPrint;
mutex printMutex,
allocateChannelMutex;
set<string> channelsAlreadyTreated, set<string> channelsAlreadyTreated,
channelsToTreat; channelsToTreat;
unsigned int commentsCount = 0, unsigned int commentsCount = 0,
@ -36,7 +42,7 @@ string CHANNELS_DIRECTORY = "channels/",
int main() int main()
{ {
// The starting set should be written to `CHANNELS_FILE_PATH`. // The starting set should be written to `CHANNELS_FILE_PATH`.
// To resume this algorithm after a shutdown, just restart it after having deleted the last channel folder in `CHANNELS_DIRECTORY` being treated. // To resume this algorithm after a shutdown, just restart it after having deleted the last channel folders in `CHANNELS_DIRECTORY` being treated.
// On a restart, `CHANNELS_FILE_PATH` is read and every channel not found in `CHANNELS_DIRECTORY` is added to `channelsToTreat` or `channelsToTreat` otherwise before continuing, as if `CHANNELS_FILE_PATH` was containing a **treated** starting set. // On a restart, `CHANNELS_FILE_PATH` is read and every channel not found in `CHANNELS_DIRECTORY` is added to `channelsToTreat` or `channelsToTreat` otherwise before continuing, as if `CHANNELS_FILE_PATH` was containing a **treated** starting set.
vector<string> channelsVec = getFileContent(CHANNELS_FILE_PATH); vector<string> channelsVec = getFileContent(CHANNELS_FILE_PATH);
channelsToTreat = set(channelsVec.begin(), channelsVec.end()); channelsToTreat = set(channelsVec.begin(), channelsVec.end());
@ -50,33 +56,59 @@ int main()
channelsAlreadyTreated.insert(channelId); channelsAlreadyTreated.insert(channelId);
} }
PRINT(channelsToTreat.size() << " channel(s) to treat") PRINT(DEFAULT_THREAD_ID, channelsToTreat.size() << " channel(s) to treat")
PRINT(channelsAlreadyTreated.size() << " channel(s) already treated") PRINT(DEFAULT_THREAD_ID, channelsAlreadyTreated.size() << " channel(s) already treated")
while(!channelsToTreat.empty()) thread threads[THREADS_NUMBER];
for(unsigned short threadsIndex = 0; threadsIndex < THREADS_NUMBER; threadsIndex++)
{ {
string channelToTreat = *channelsToTreat.begin(); threads[threadsIndex] = thread(treatChannels, threadsIndex + 1);
}
PRINT("Treating channel " << channelToTreat << " (treated: " << channelsAlreadyTreated.size() << ", to treat: " << channelsToTreat.size() << ")") for(unsigned short threadsIndex = 0; threadsIndex < THREADS_NUMBER; threadsIndex++)
{
channelsAlreadyTreated.insert(channelToTreat); threads[threadsIndex].join();
string channelToTreatDirectory = CHANNELS_DIRECTORY + channelToTreat + "/";
createDirectory(channelToTreatDirectory);
treatChannelOrVideo(true, channelToTreat, channelToTreat);
PRINT(commentsCount)
commentsCount = 0;
requestsPerChannel = 0;
channelsToTreat.erase(channelToTreat);
} }
return 0; return 0;
} }
void treatChannelOrVideo(bool isChannel, string id, string channelToTreat) void treatChannels(unsigned short threadId)
{
// For the moment we assume that we never have treated completely YouTube, otherwise we have to pay attention how to proceed if the starting set involves startvation for some threads.
while(true)
{
allocateChannelMutex.lock();
if(channelsToTreat.empty())
{
allocateChannelMutex.unlock();
sleep(1);
continue;
}
string channelToTreat = *channelsToTreat.begin();
PRINT(threadId, "Treating channel " << channelToTreat << " (treated: " << channelsAlreadyTreated.size() << ", to treat: " << channelsToTreat.size() << ")")
channelsToTreat.erase(channelToTreat);
channelsAlreadyTreated.insert(channelToTreat);
allocateChannelMutex.unlock();
string channelToTreatDirectory = CHANNELS_DIRECTORY + channelToTreat + "/";
createDirectory(channelToTreatDirectory);
treatChannelOrVideo(threadId, true, channelToTreat, channelToTreat);
PRINT(threadId, commentsCount << " comments were found for this channel.")
commentsCount = 0;
requestsPerChannel = 0;
}
allocateChannelMutex.unlock();
}
void treatChannelOrVideo(unsigned short threadId, bool isChannel, string id, string channelToTreat)
{ {
string pageToken = ""; string pageToken = "";
while(true) while(true)
@ -84,7 +116,7 @@ void treatChannelOrVideo(bool isChannel, string id, string channelToTreat)
ostringstream toString; ostringstream toString;
toString << "commentThreads?part=snippet,replies&" << (isChannel ? "allThreadsRelatedToChannelId" : "videoId") << "=" << id << "&maxResults=100&pageToken=" << pageToken; toString << "commentThreads?part=snippet,replies&" << (isChannel ? "allThreadsRelatedToChannelId" : "videoId") << "=" << id << "&maxResults=100&pageToken=" << pageToken;
string url = toString.str(); string url = toString.str();
json data = getJson(url, channelToTreat); json data = getJson(threadId, url, channelToTreat);
bool doesRelyingOnCommentThreadsIsEnough = (!isChannel) || data["error"]["errors"][0]["reason"] != "commentsDisabled"; bool doesRelyingOnCommentThreadsIsEnough = (!isChannel) || data["error"]["errors"][0]["reason"] != "commentsDisabled";
if(doesRelyingOnCommentThreadsIsEnough) if(doesRelyingOnCommentThreadsIsEnough)
{ {
@ -93,7 +125,7 @@ void treatChannelOrVideo(bool isChannel, string id, string channelToTreat)
{ {
json comment = item["snippet"]["topLevelComment"]; json comment = item["snippet"]["topLevelComment"];
string commentId = comment["id"]; string commentId = comment["id"];
treatComment(comment, channelToTreat); treatComment(threadId, comment, channelToTreat);
if(item.contains("replies")) if(item.contains("replies"))
{ {
json replies = item["replies"]["comments"]; json replies = item["replies"]["comments"];
@ -102,11 +134,11 @@ void treatChannelOrVideo(bool isChannel, string id, string channelToTreat)
string pageToken = ""; string pageToken = "";
while(true) while(true)
{ {
json data = getJson("comments?part=snippet&parentId=" + commentId + "&maxResults=100&pageToken=" + pageToken, channelToTreat), json data = getJson(threadId, "comments?part=snippet&parentId=" + commentId + "&maxResults=100&pageToken=" + pageToken, channelToTreat),
items = data["items"]; items = data["items"];
for(const auto& item : items) for(const auto& item : items)
{ {
treatComment(item, channelToTreat); treatComment(threadId, item, channelToTreat);
} }
if(data.contains("nextPageToken")) if(data.contains("nextPageToken"))
{ {
@ -122,7 +154,7 @@ void treatChannelOrVideo(bool isChannel, string id, string channelToTreat)
{ {
for(const auto& reply : replies) for(const auto& reply : replies)
{ {
treatComment(reply, channelToTreat); treatComment(threadId, reply, channelToTreat);
} }
} }
} }
@ -138,11 +170,11 @@ void treatChannelOrVideo(bool isChannel, string id, string channelToTreat)
} }
else else
{ {
PRINT("Comments disabled channel, treating differently...") PRINT(threadId, "Comments disabled channel, treating differently...")
json data = getJson("channels?part=statistics&id=" + channelToTreat, channelToTreat); json data = getJson(threadId, "channels?part=statistics&id=" + channelToTreat, channelToTreat);
// YouTube Data API v3 Videos: list endpoint returns `videoCount` as a string and not an integer... // YouTube Data API v3 Videos: list endpoint returns `videoCount` as a string and not an integer...
unsigned int videoCount = atoi(string(data["items"][0]["statistics"]["videoCount"]).c_str()); unsigned int videoCount = atoi(string(data["items"][0]["statistics"]["videoCount"]).c_str());
PRINT("The channel has about " << videoCount << " videos.") PRINT(threadId, "The channel has about " << videoCount << " videos.")
// `UC-3A9g4U1PpLaeAuD4jSP_w` has a `videoCount` of 2, while its `uploads` playlist contains 3 videos. So we use a strict inequality here. // `UC-3A9g4U1PpLaeAuD4jSP_w` has a `videoCount` of 2, while its `uploads` playlist contains 3 videos. So we use a strict inequality here.
if(videoCount < 20000) if(videoCount < 20000)
{ {
@ -151,14 +183,14 @@ void treatChannelOrVideo(bool isChannel, string id, string channelToTreat)
while(true) while(true)
{ {
// `snippet` and `status` are unneeded `part`s here but may be interesting later, as we log them. // `snippet` and `status` are unneeded `part`s here but may be interesting later, as we log them.
json data = getJson("playlistItems?part=snippet,contentDetails,status&playlistId=" + playlistToTreat + "&maxResults=50&pageToken=" + pageToken, channelToTreat), json data = getJson(threadId, "playlistItems?part=snippet,contentDetails,status&playlistId=" + playlistToTreat + "&maxResults=50&pageToken=" + pageToken, channelToTreat),
items = data["items"]; items = data["items"];
for(const auto& item : items) for(const auto& item : items)
{ {
string videoId = item["contentDetails"]["videoId"]; string videoId = item["contentDetails"]["videoId"];
// To keep the same amount of logs for each channel, I comment the following `PRINT`. // To keep the same amount of logs for each channel, I comment the following `PRINT`.
//PRINT("Treating video " << videoId) //PRINT("Treating video " << videoId)
treatChannelOrVideo(false, videoId, channelToTreat); treatChannelOrVideo(threadId, false, videoId, channelToTreat);
} }
if(data.contains("nextPageToken")) if(data.contains("nextPageToken"))
{ {
@ -173,14 +205,14 @@ void treatChannelOrVideo(bool isChannel, string id, string channelToTreat)
} }
else else
{ {
PRINT("The videos count of the channel exceeds the supported 20,000 limit!") PRINT(threadId, "The videos count of the channel exceeds the supported 20,000 limit!")
exit(1); exit(1);
} }
} }
} }
} }
void treatComment(json comment, string channelId) void treatComment(unsigned short threadId, json comment, string channelId)
{ {
json snippet = comment["snippet"]; json snippet = comment["snippet"];
// The `else` case can happen (cf `95a9421ad0469a09335afeddb2983e31dc00bc36`). // The `else` case can happen (cf `95a9421ad0469a09335afeddb2983e31dc00bc36`).
@ -191,13 +223,13 @@ void treatComment(json comment, string channelId)
{ {
channelsToTreat.insert(channelId); channelsToTreat.insert(channelId);
writeFile(CHANNELS_FILE_PATH, "a", "\n" + channelId); writeFile(threadId, CHANNELS_FILE_PATH, "a", "\n" + channelId);
} }
} }
commentsCount++; commentsCount++;
} }
bool writeFile(string filePath, string option, string toWrite) bool writeFile(unsigned short threadId, string filePath, string option, string toWrite)
{ {
FILE* file = fopen(filePath.c_str(), option.c_str()); FILE* file = fopen(filePath.c_str(), option.c_str());
if(file != NULL) if(file != NULL)
@ -208,7 +240,7 @@ bool writeFile(string filePath, string option, string toWrite)
} }
else else
{ {
PRINT("writeFile error: " << strerror(errno)) PRINT(threadId, "writeFile error: " << strerror(errno))
} }
return false; return false;
} }
@ -243,7 +275,7 @@ vector<string> getFileContent(string filePath)
return lines; return lines;
} }
json getJson(string url, string directoryPath) json getJson(unsigned short threadId, string url, string directoryPath)
{ {
#ifdef USE_YT_LEMNOSLIFE_COM_NO_KEY_SERVICE #ifdef USE_YT_LEMNOSLIFE_COM_NO_KEY_SERVICE
string finalUrl = "https://yt.lemnoslife.com/noKey/" + url; string finalUrl = "https://yt.lemnoslife.com/noKey/" + url;
@ -256,17 +288,22 @@ json getJson(string url, string directoryPath)
ostringstream toString; ostringstream toString;
toString << CHANNELS_DIRECTORY << directoryPath << "/" << requestsPerChannel << ".json"; toString << CHANNELS_DIRECTORY << directoryPath << "/" << requestsPerChannel << ".json";
requestsPerChannel++; requestsPerChannel++;
writeFile(toString.str(), "w", url + "\n" + content); writeFile(threadId, toString.str(), "w", url + "\n" + content);
return data; return data;
} }
void print(ostringstream* toPrint) void print(ostringstream* toPrint)
{ {
printMutex.lock();
cout << getDate() << ": " << toPrint->str() << endl; cout << getDate() << ": " << toPrint->str() << endl;
toPrint->str(""); toPrint->str("");
printMutex.unlock();
} }
// Is this function really multi-threading friendly? If not, could consider executing `curl` using the command line.
string getHttps(string url) string getHttps(string url)
{ {
CURL* curl = curl_easy_init(); CURL* curl = curl_easy_init();