diff --git a/main.cpp b/main.cpp index bfeef7e..83e6721 100644 --- a/main.cpp +++ b/main.cpp @@ -2,30 +2,36 @@ #include #include #include +#include +#include #include +#include #include #include using namespace std; using json = nlohmann::json; vector getFileContent(string filePath); -json getJson(string url, string directoryPath); +json getJson(unsigned short threadId, string url, string directoryPath); void createDirectory(string path), print(ostringstream* toPrint), - treatComment(json comment, string channelId), - treatChannelOrVideo(bool isChannel, string id, string channelToTreat); + treatComment(unsigned short threadId, json comment, string channelId), + treatChannelOrVideo(unsigned short threadId, bool isChannel, string id, string channelToTreat), + treatChannels(unsigned short threadId); string getHttps(string url); size_t writeCallback(void* contents, size_t size, size_t nmemb, void* userp); bool doesFileExist(string filePath), - writeFile(string filePath, string option, string toWrite); + writeFile(unsigned short threadId, string filePath, string option, string toWrite); #define USE_YT_LEMNOSLIFE_COM_NO_KEY_SERVICE #define API_KEY "AIzaSy..." +#define THREADS_NUMBER 10 -// Note that this printing approach is only safe in a mono-thread context. -#define PRINT(x) toPrint << x; print(&toPrint); -ostringstream toPrint; +#define PRINT(threadId, x) { ostringstream toPrint; toPrint << threadId << ": " << x; print(&toPrint); } +#define DEFAULT_THREAD_ID 0 +mutex printMutex, + allocateChannelMutex; set channelsAlreadyTreated, channelsToTreat; unsigned int commentsCount = 0, @@ -36,7 +42,7 @@ string CHANNELS_DIRECTORY = "channels/", int main() { // The starting set should be written to `CHANNELS_FILE_PATH`. - // To resume this algorithm after a shutdown, just restart it after having deleted the last channel folder in `CHANNELS_DIRECTORY` being treated. + // To resume this algorithm after a shutdown, just restart it after having deleted the last channel folders in `CHANNELS_DIRECTORY` being treated. // On a restart, `CHANNELS_FILE_PATH` is read and every channel not found in `CHANNELS_DIRECTORY` is added to `channelsToTreat` or `channelsToTreat` otherwise before continuing, as if `CHANNELS_FILE_PATH` was containing a **treated** starting set. vector channelsVec = getFileContent(CHANNELS_FILE_PATH); channelsToTreat = set(channelsVec.begin(), channelsVec.end()); @@ -50,33 +56,59 @@ int main() channelsAlreadyTreated.insert(channelId); } - PRINT(channelsToTreat.size() << " channel(s) to treat") - PRINT(channelsAlreadyTreated.size() << " channel(s) already treated") + PRINT(DEFAULT_THREAD_ID, channelsToTreat.size() << " channel(s) to treat") + PRINT(DEFAULT_THREAD_ID, channelsAlreadyTreated.size() << " channel(s) already treated") - while(!channelsToTreat.empty()) + thread threads[THREADS_NUMBER]; + for(unsigned short threadsIndex = 0; threadsIndex < THREADS_NUMBER; threadsIndex++) { - string channelToTreat = *channelsToTreat.begin(); + threads[threadsIndex] = thread(treatChannels, threadsIndex + 1); + } - PRINT("Treating channel " << channelToTreat << " (treated: " << channelsAlreadyTreated.size() << ", to treat: " << channelsToTreat.size() << ")") - - channelsAlreadyTreated.insert(channelToTreat); - - string channelToTreatDirectory = CHANNELS_DIRECTORY + channelToTreat + "/"; - createDirectory(channelToTreatDirectory); - - treatChannelOrVideo(true, channelToTreat, channelToTreat); - - PRINT(commentsCount) - commentsCount = 0; - requestsPerChannel = 0; - - channelsToTreat.erase(channelToTreat); + for(unsigned short threadsIndex = 0; threadsIndex < THREADS_NUMBER; threadsIndex++) + { + threads[threadsIndex].join(); } return 0; } -void treatChannelOrVideo(bool isChannel, string id, string channelToTreat) +void treatChannels(unsigned short threadId) +{ + // For the moment we assume that we never have treated completely YouTube, otherwise we have to pay attention how to proceed if the starting set involves startvation for some threads. + while(true) + { + allocateChannelMutex.lock(); + if(channelsToTreat.empty()) + { + allocateChannelMutex.unlock(); + sleep(1); + continue; + } + + string channelToTreat = *channelsToTreat.begin(); + + PRINT(threadId, "Treating channel " << channelToTreat << " (treated: " << channelsAlreadyTreated.size() << ", to treat: " << channelsToTreat.size() << ")") + + channelsToTreat.erase(channelToTreat); + channelsAlreadyTreated.insert(channelToTreat); + + allocateChannelMutex.unlock(); + + string channelToTreatDirectory = CHANNELS_DIRECTORY + channelToTreat + "/"; + createDirectory(channelToTreatDirectory); + + treatChannelOrVideo(threadId, true, channelToTreat, channelToTreat); + + PRINT(threadId, commentsCount << " comments were found for this channel.") + commentsCount = 0; + requestsPerChannel = 0; + } + + allocateChannelMutex.unlock(); +} + +void treatChannelOrVideo(unsigned short threadId, bool isChannel, string id, string channelToTreat) { string pageToken = ""; while(true) @@ -84,7 +116,7 @@ void treatChannelOrVideo(bool isChannel, string id, string channelToTreat) ostringstream toString; toString << "commentThreads?part=snippet,replies&" << (isChannel ? "allThreadsRelatedToChannelId" : "videoId") << "=" << id << "&maxResults=100&pageToken=" << pageToken; string url = toString.str(); - json data = getJson(url, channelToTreat); + json data = getJson(threadId, url, channelToTreat); bool doesRelyingOnCommentThreadsIsEnough = (!isChannel) || data["error"]["errors"][0]["reason"] != "commentsDisabled"; if(doesRelyingOnCommentThreadsIsEnough) { @@ -93,7 +125,7 @@ void treatChannelOrVideo(bool isChannel, string id, string channelToTreat) { json comment = item["snippet"]["topLevelComment"]; string commentId = comment["id"]; - treatComment(comment, channelToTreat); + treatComment(threadId, comment, channelToTreat); if(item.contains("replies")) { json replies = item["replies"]["comments"]; @@ -102,11 +134,11 @@ void treatChannelOrVideo(bool isChannel, string id, string channelToTreat) string pageToken = ""; while(true) { - json data = getJson("comments?part=snippet&parentId=" + commentId + "&maxResults=100&pageToken=" + pageToken, channelToTreat), + json data = getJson(threadId, "comments?part=snippet&parentId=" + commentId + "&maxResults=100&pageToken=" + pageToken, channelToTreat), items = data["items"]; for(const auto& item : items) { - treatComment(item, channelToTreat); + treatComment(threadId, item, channelToTreat); } if(data.contains("nextPageToken")) { @@ -122,7 +154,7 @@ void treatChannelOrVideo(bool isChannel, string id, string channelToTreat) { for(const auto& reply : replies) { - treatComment(reply, channelToTreat); + treatComment(threadId, reply, channelToTreat); } } } @@ -138,11 +170,11 @@ void treatChannelOrVideo(bool isChannel, string id, string channelToTreat) } else { - PRINT("Comments disabled channel, treating differently...") - json data = getJson("channels?part=statistics&id=" + channelToTreat, channelToTreat); + PRINT(threadId, "Comments disabled channel, treating differently...") + json data = getJson(threadId, "channels?part=statistics&id=" + channelToTreat, channelToTreat); // YouTube Data API v3 Videos: list endpoint returns `videoCount` as a string and not an integer... unsigned int videoCount = atoi(string(data["items"][0]["statistics"]["videoCount"]).c_str()); - PRINT("The channel has about " << videoCount << " videos.") + PRINT(threadId, "The channel has about " << videoCount << " videos.") // `UC-3A9g4U1PpLaeAuD4jSP_w` has a `videoCount` of 2, while its `uploads` playlist contains 3 videos. So we use a strict inequality here. if(videoCount < 20000) { @@ -151,14 +183,14 @@ void treatChannelOrVideo(bool isChannel, string id, string channelToTreat) while(true) { // `snippet` and `status` are unneeded `part`s here but may be interesting later, as we log them. - json data = getJson("playlistItems?part=snippet,contentDetails,status&playlistId=" + playlistToTreat + "&maxResults=50&pageToken=" + pageToken, channelToTreat), + json data = getJson(threadId, "playlistItems?part=snippet,contentDetails,status&playlistId=" + playlistToTreat + "&maxResults=50&pageToken=" + pageToken, channelToTreat), items = data["items"]; for(const auto& item : items) { string videoId = item["contentDetails"]["videoId"]; // To keep the same amount of logs for each channel, I comment the following `PRINT`. //PRINT("Treating video " << videoId) - treatChannelOrVideo(false, videoId, channelToTreat); + treatChannelOrVideo(threadId, false, videoId, channelToTreat); } if(data.contains("nextPageToken")) { @@ -173,14 +205,14 @@ void treatChannelOrVideo(bool isChannel, string id, string channelToTreat) } else { - PRINT("The videos count of the channel exceeds the supported 20,000 limit!") + PRINT(threadId, "The videos count of the channel exceeds the supported 20,000 limit!") exit(1); } } } } -void treatComment(json comment, string channelId) +void treatComment(unsigned short threadId, json comment, string channelId) { json snippet = comment["snippet"]; // The `else` case can happen (cf `95a9421ad0469a09335afeddb2983e31dc00bc36`). @@ -191,13 +223,13 @@ void treatComment(json comment, string channelId) { channelsToTreat.insert(channelId); - writeFile(CHANNELS_FILE_PATH, "a", "\n" + channelId); + writeFile(threadId, CHANNELS_FILE_PATH, "a", "\n" + channelId); } } commentsCount++; } -bool writeFile(string filePath, string option, string toWrite) +bool writeFile(unsigned short threadId, string filePath, string option, string toWrite) { FILE* file = fopen(filePath.c_str(), option.c_str()); if(file != NULL) @@ -208,7 +240,7 @@ bool writeFile(string filePath, string option, string toWrite) } else { - PRINT("writeFile error: " << strerror(errno)) + PRINT(threadId, "writeFile error: " << strerror(errno)) } return false; } @@ -243,7 +275,7 @@ vector getFileContent(string filePath) return lines; } -json getJson(string url, string directoryPath) +json getJson(unsigned short threadId, string url, string directoryPath) { #ifdef USE_YT_LEMNOSLIFE_COM_NO_KEY_SERVICE string finalUrl = "https://yt.lemnoslife.com/noKey/" + url; @@ -256,17 +288,22 @@ json getJson(string url, string directoryPath) ostringstream toString; toString << CHANNELS_DIRECTORY << directoryPath << "/" << requestsPerChannel << ".json"; requestsPerChannel++; - writeFile(toString.str(), "w", url + "\n" + content); + writeFile(threadId, toString.str(), "w", url + "\n" + content); return data; } void print(ostringstream* toPrint) { + printMutex.lock(); + cout << getDate() << ": " << toPrint->str() << endl; toPrint->str(""); + + printMutex.unlock(); } +// Is this function really multi-threading friendly? If not, could consider executing `curl` using the command line. string getHttps(string url) { CURL* curl = curl_easy_init();