YouTube_captions_search_engine/main.cpp
Benjamin Loison 71e4bd95a9
Fix #9: Make sure that in case of error returned by the YouTube Data API v3 the algorithm treats it correctly
Note that in case of error the algorithm used to skip the received content, as if just no `items` were in it.
2023-01-06 20:55:32 +01:00

389 lines
14 KiB
C++

#include <iostream>
#include <fstream>
#include <sstream>
#include <set>
#include <mutex>
#include <thread>
#include <sys/stat.h>
#include <unistd.h>
#include <curl/curl.h>
#include <nlohmann/json.hpp>
using namespace std;
using namespace chrono;
using json = nlohmann::json;
vector<string> getFileContent(string filePath);
json getJson(unsigned short threadId, string url, string directoryPath);
void createDirectory(string path),
print(ostringstream* toPrint),
treatComment(unsigned short threadId, json comment, string channelId),
treatChannelOrVideo(unsigned short threadId, bool isChannel, string id, string channelToTreat),
treatChannels(unsigned short threadId),
deleteDirectory(string path);
string getHttps(string url),
exec(string cmd);
size_t writeCallback(void* contents, size_t size, size_t nmemb, void* userp);
bool doesFileExist(string filePath),
writeFile(unsigned short threadId, string filePath, string option, string toWrite);
#define USE_YT_LEMNOSLIFE_COM_NO_KEY_SERVICE
#define API_KEY "AIzaSy..."
#define THREADS_NUMBER 10
#define PRINT(threadId, x) { ostringstream toPrint; toPrint << threadId << ": " << x; print(&toPrint); }
#define DEFAULT_THREAD_ID 0
mutex printMutex,
channelsAlreadyTreatedAndToTreatMutex;
set<string> channelsAlreadyTreated,
channelsToTreat;
unsigned int commentsCount = 0,
commentsPerSecondCount = 0,
requestsPerChannel = 0;
string CHANNELS_DIRECTORY = "channels/",
CHANNELS_FILE_PATH = "channels.txt";
int main()
{
// The starting set should be written to `CHANNELS_FILE_PATH`.
// To resume this algorithm after a shutdown, just restart it after having deleted the last channel folders in `CHANNELS_DIRECTORY` being treated.
// On a restart, `CHANNELS_FILE_PATH` is read and every channel not found in `CHANNELS_DIRECTORY` is added to `channelsToTreat` or `channelsToTreat` otherwise before continuing, as if `CHANNELS_FILE_PATH` was containing a **treated** starting set.
vector<string> channelsVec = getFileContent(CHANNELS_FILE_PATH);
channelsToTreat = set(channelsVec.begin(), channelsVec.end());
createDirectory(CHANNELS_DIRECTORY);
for(const auto& entry : filesystem::directory_iterator(CHANNELS_DIRECTORY))
{
string channelId = entry.path().filename();
channelsToTreat.erase(channelId);
channelsAlreadyTreated.insert(channelId);
}
PRINT(DEFAULT_THREAD_ID, channelsToTreat.size() << " channel(s) to treat")
PRINT(DEFAULT_THREAD_ID, channelsAlreadyTreated.size() << " channel(s) already treated")
thread threads[THREADS_NUMBER];
for(unsigned short threadsIndex = 0; threadsIndex < THREADS_NUMBER; threadsIndex++)
{
threads[threadsIndex] = thread(treatChannels, threadsIndex + 1);
}
while(true)
{
PRINT(DEFAULT_THREAD_ID, "Comments per second: " << commentsPerSecondCount)
commentsPerSecondCount = 0;
sleep(1);
}
// The following is dead code, as we assume below not to have ever treated completely YouTube.
for(unsigned short threadsIndex = 0; threadsIndex < THREADS_NUMBER; threadsIndex++)
{
threads[threadsIndex].join();
}
return 0;
}
void treatChannels(unsigned short threadId)
{
// For the moment we assume that we never have treated completely YouTube, otherwise we have to pay attention how to proceed if the starting set involves startvation for some threads.
while(true)
{
channelsAlreadyTreatedAndToTreatMutex.lock();
if(channelsToTreat.empty())
{
channelsAlreadyTreatedAndToTreatMutex.unlock();
sleep(1);
continue;
}
string channelToTreat = *channelsToTreat.begin();
PRINT(threadId, "Treating channel " << channelToTreat << " (treated: " << channelsAlreadyTreated.size() << ", to treat: " << channelsToTreat.size() << ")")
channelsToTreat.erase(channelToTreat);
channelsAlreadyTreated.insert(channelToTreat);
channelsAlreadyTreatedAndToTreatMutex.unlock();
string channelToTreatDirectory = CHANNELS_DIRECTORY + channelToTreat + "/";
createDirectory(channelToTreatDirectory);
treatChannelOrVideo(threadId, true, channelToTreat, channelToTreat);
// As I haven't found any well-known library that compress easily a directory, I have chosen to rely on `zip` cli.
exec("cd " + channelToTreatDirectory + " && zip -r ../" + channelToTreat + ".zip *");
deleteDirectory(channelToTreatDirectory);
PRINT(threadId, commentsCount << " comments were found for this channel.")
commentsCount = 0;
requestsPerChannel = 0;
}
channelsAlreadyTreatedAndToTreatMutex.unlock();
}
void treatChannelOrVideo(unsigned short threadId, bool isChannel, string id, string channelToTreat)
{
string pageToken = "";
while(true)
{
ostringstream toString;
toString << "commentThreads?part=snippet,replies&" << (isChannel ? "allThreadsRelatedToChannelId" : "videoId") << "=" << id << "&maxResults=100&pageToken=" << pageToken;
string url = toString.str();
json data = getJson(threadId, url, channelToTreat);
bool doesRelyingOnCommentThreadsIsEnough = (!isChannel) || data["error"]["errors"][0]["reason"] != "commentsDisabled";
if(doesRelyingOnCommentThreadsIsEnough)
{
json items = data["items"];
for(const auto& item : items)
{
json comment = item["snippet"]["topLevelComment"];
string commentId = comment["id"];
treatComment(threadId, comment, channelToTreat);
if(item.contains("replies"))
{
if(item["snippet"]["totalReplyCount"] > 5)
{
string pageToken = "";
while(true)
{
json data = getJson(threadId, "comments?part=snippet&parentId=" + commentId + "&maxResults=100&pageToken=" + pageToken, channelToTreat),
items = data["items"];
for(const auto& item : items)
{
treatComment(threadId, item, channelToTreat);
}
if(data.contains("nextPageToken"))
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
}
else
{
json replies = item["replies"]["comments"];
for(const auto& reply : replies)
{
treatComment(threadId, reply, channelToTreat);
}
}
}
}
if(data.contains("nextPageToken"))
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
else
{
PRINT(threadId, "Comments disabled channel, treating differently...")
json data = getJson(threadId, "channels?part=statistics&id=" + channelToTreat, channelToTreat);
// YouTube Data API v3 Videos: list endpoint returns `videoCount` as a string and not an integer...
unsigned int videoCount = atoi(string(data["items"][0]["statistics"]["videoCount"]).c_str());
PRINT(threadId, "The channel has about " << videoCount << " videos.")
// `UC-3A9g4U1PpLaeAuD4jSP_w` has a `videoCount` of 2, while its `uploads` playlist contains 3 videos. So we use a strict inequality here.
if(videoCount < 20000)
{
string playlistToTreat = "UU" + channelToTreat.substr(2),
pageToken = "";
while(true)
{
// `snippet` and `status` are unneeded `part`s here but may be interesting later, as we log them.
json data = getJson(threadId, "playlistItems?part=snippet,contentDetails,status&playlistId=" + playlistToTreat + "&maxResults=50&pageToken=" + pageToken, channelToTreat),
items = data["items"];
for(const auto& item : items)
{
string videoId = item["contentDetails"]["videoId"];
// To keep the same amount of logs for each channel, I comment the following `PRINT`.
//PRINT("Treating video " << videoId)
treatChannelOrVideo(threadId, false, videoId, channelToTreat);
}
if(data.contains("nextPageToken"))
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
break;
}
else
{
PRINT(threadId, "The videos count of the channel exceeds the supported 20,000 limit!")
exit(1);
}
}
}
}
void treatComment(unsigned short threadId, json comment, string channelId)
{
json snippet = comment["snippet"];
// The `else` case can happen (cf `95a9421ad0469a09335afeddb2983e31dc00bc36`).
if(snippet.contains("authorChannelId"))
{
string channelId = snippet["authorChannelId"]["value"];
channelsAlreadyTreatedAndToTreatMutex.lock();
if(channelsAlreadyTreated.find(channelId) == channelsAlreadyTreated.end() && channelsToTreat.find(channelId) == channelsToTreat.end())
{
channelsToTreat.insert(channelId);
channelsAlreadyTreatedAndToTreatMutex.unlock();
writeFile(threadId, CHANNELS_FILE_PATH, "a", "\n" + channelId);
}
else
{
channelsAlreadyTreatedAndToTreatMutex.unlock();
}
}
commentsCount++;
commentsPerSecondCount++;
}
string exec(string cmd)
{
array<char, 128> buffer;
string result;
unique_ptr<FILE, decltype(&pclose)> pipe(popen(cmd.c_str(), "r"), pclose);
if (!pipe)
{
throw runtime_error("popen() failed!");
}
while (fgets(buffer.data(), buffer.size(), pipe.get()) != nullptr)
{
result += buffer.data();
}
return result;
}
bool writeFile(unsigned short threadId, string filePath, string option, string toWrite)
{
FILE* file = fopen(filePath.c_str(), option.c_str());
if(file != NULL)
{
fputs(toWrite.c_str(), file);
fclose(file);
return true;
}
else
{
PRINT(threadId, "writeFile error: " << strerror(errno))
}
return false;
}
bool doesFileExist(string filePath)
{
struct stat buffer;
return stat(filePath.c_str(), &buffer) == 0;
}
void createDirectory(string path)
{
mkdir(path.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
}
void deleteDirectory(string path)
{
filesystem::remove_all(path);
}
string getDate()
{
auto t = time(nullptr);
auto tm = *localtime(&t);
ostringstream toString;
toString << put_time(&tm, "%d-%m-%Y %H-%M-%S.");
milliseconds ms = duration_cast<milliseconds>(
system_clock::now().time_since_epoch()
);
toString << (ms.count() % 1000);
return toString.str();
}
vector<string> getFileContent(string filePath)
{
vector<string> lines;
ifstream infile(filePath.c_str());
string line;
while(getline(infile, line))
lines.push_back(line);
return lines;
}
json getJson(unsigned short threadId, string url, string directoryPath)
{
#ifdef USE_YT_LEMNOSLIFE_COM_NO_KEY_SERVICE
string finalUrl = "https://yt.lemnoslife.com/noKey/" + url;
#else
string finalUrl = "https://www.googleapis.com/youtube/v3/" + url + "&key=" + API_KEY;
#endif
string content = getHttps(finalUrl);
json data;
try
{
data = json::parse(content);
}
catch (json::parse_error& ex)
{
PRINT(threadId, "Parse error for " << finalUrl << ", as got: " << content << " !")
exit(1);
}
if(data.contains("error"))
{
PRINT(threadId, "Found error in JSON at URL: " << finalUrl << " for content: " << content << " !")
return getJson(threadId, url, directoryPath);
}
ostringstream toString;
toString << CHANNELS_DIRECTORY << directoryPath << "/" << requestsPerChannel << ".json";
requestsPerChannel++;
writeFile(threadId, toString.str(), "w", url + "\n" + content);
return data;
}
void print(ostringstream* toPrint)
{
printMutex.lock();
cout << getDate() << ": " << toPrint->str() << endl;
toPrint->str("");
printMutex.unlock();
}
// Is this function really multi-threading friendly? If not, could consider executing `curl` using the command line.
string getHttps(string url)
{
CURL* curl = curl_easy_init();
string got;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 1);
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 1);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeCallback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &got);
curl_easy_perform(curl);
curl_easy_cleanup(curl);
return got;
}
size_t writeCallback(void* contents, size_t size, size_t nmemb, void* userp)
{
((string*)userp)->append((char*)contents, size * nmemb);
return size * nmemb;
}