YouTube_captions_search_engine/main.cpp

932 lines
45 KiB
C++

#include <iostream>
#include <fstream>
#include <sstream>
#include <set>
#include <mutex>
#include <thread>
#include <sys/stat.h>
#include <unistd.h>
#include <curl/curl.h>
#include <nlohmann/json.hpp>
using namespace std;
using namespace chrono;
using json = nlohmann::json;
// Concerning `retryOnCommentsDisabled`, `commentThreads` can return for some channels that they have disabled their comments while we can find comments on some videos, so we enumerate the channel videos and request the comments on each video.
// Concerning `returnErrorIfPlaylistNotFound`, it is used when not trying to retrieve a channel `uploads` playlist content as it seems to always work.
enum getJsonBehavior { normal, retryOnCommentsDisabled, returnErrorIfPlaylistNotFound };
vector<string> getFileContent(string filePath);
json getJson(unsigned short threadId, string url, bool usingYouTubeDataApiV3, string channelId, getJsonBehavior behavior = normal);
void createDirectory(string path),
print(ostringstream* toPrint),
treatComment(unsigned short threadId, json comment, string channelId),
treatChannelOrVideo(unsigned short threadId, bool isIdAChannelId, string id, string channelToTreat),
treatChannels(unsigned short threadId),
deleteDirectory(string path),
markChannelAsRequiringTreatmentIfNeeded(unsigned short threadId, string channelId),
execute(unsigned short threadId, string command, bool debug = true);
string getHttps(string url),
join(vector<string> parts, string delimiter);
size_t writeCallback(void* contents, size_t size, size_t nmemb, void* userp);
bool doesFileExist(string filePath),
writeFile(unsigned short threadId, string filePath, string option, string toWrite);
// Use macros not to have to repeat `threadId` in each function calling `print` function.
#define THREAD_PRINT(threadId, x) { ostringstream toPrint; toPrint << threadId << ": " << x; print(&toPrint); }
#define PRINT(x) THREAD_PRINT(threadId, x)
#define DEFAULT_THREAD_ID 0
#define MAIN_PRINT(x) THREAD_PRINT(DEFAULT_THREAD_ID, x)
#define EXIT_WITH_ERROR(x) { PRINT(x); exit(EXIT_FAILURE); }
#define MAIN_EXIT_WITH_ERROR(x) { MAIN_PRINT(x); exit(EXIT_FAILURE); }
// Note that in the following a `channel` designates a `string` that is the channel id starting with `UC`.
// The only resources shared are:
// - standard streams
// - the ordered set of channels to treat and the unordered set of channels already treated
// - the ordered set of YouTube Data API v3 keys
mutex printMutex,
channelsAlreadyTreatedAndToTreatMutex,
quotaMutex;
// We use `set`s and `map`s for performance reasons.
set<string> channelsAlreadyTreated;
// Two `map`s to simulate a bidirectional map.
map<unsigned int, string> channelsToTreat;
map<string, unsigned int> channelsToTreatRev;
vector<string> youtubeDataApiV3keys;
// For statistics we count the number of:
// - channels found per second (`channelsFoundPerSecondCount`)
// - channels (`channelsTreatedCountThreads`) and requests (`requestsCountThreads`) done by each channel once they are treated
unsigned int channelsFoundPerSecondCount = 0;
map<unsigned short, unsigned int> channelsTreatedCountThreads,
requestsCountThreads;
// Variables that can be override by command line arguments.
unsigned short THREADS_NUMBER = 1;
// Can be https://yt.lemnoslife.com to use the official YouTube operational API instance for instance.
string YOUTUBE_OPERATIONAL_API_INSTANCE_URL = "http://localhost/YouTube-operational-API";
bool USE_YT_LEMNOSLIFE_COM_NO_KEY_SERVICE = false;
// Constants written as `string` variables instead of macros to have `string` properties, even if could use a meta-macro inlining as `string`s.
string CHANNELS_DIRECTORY = "channels/",
STARTING_CHANNELS_SET_FILE_PATH = "channels.txt",
YOUTUBE_DATA_API_V3_KEYS_FILE_PATH = "keys.txt",
UNLISTED_VIDEOS_FILE_PATH = "unlistedVideos.txt",
CAPTIONS_DIRECTORY = "captions/",
DEBUG_DIRECTORY = "debug/",
YOUTUBE_APIS_REQUESTS_DIRECTORY = "requests/";
// The keys usage is identical to the YouTube operational API no-key service that is about using completely the daily quota of the first key before using the next one and so on by looping when reached the end of the ordered keys set.
string currentYouTubeDataAPIv3Key = "", // Will firstly be filled with `YOUTUBE_DATA_API_V3_KEYS_FILE_PATH` first line.
CURRENT_WORKING_DIRECTORY;
int main(int argc, char *argv[])
{
// Proceed passed command line arguments.
for(unsigned short argvIndex = 1; argvIndex < argc; argvIndex++)
{
string argvStr = string(argv[argvIndex]);
if(argvStr == "--no-keys")
{
USE_YT_LEMNOSLIFE_COM_NO_KEY_SERVICE = true;
}
else if(argvStr.rfind("--threads=", 0) == 0)
{
THREADS_NUMBER = atoi(argvStr.substr(10).c_str());
}
else if(argvStr == "-h" || argvStr == "--help")
{
MAIN_PRINT("Usage: " << argv[0] << " [--help/-h] [--no-keys] [--threads=N] [--youtube-operational-api-instance-url URL]")
exit(EXIT_SUCCESS);
}
// Contrarily to `--threads=` the separator between the command line argument label and value is a space and not an equal sign.
else if(argvStr == "--youtube-operational-api-instance-url")
{
if(argvIndex < argc - 1)
{
YOUTUBE_OPERATIONAL_API_INSTANCE_URL = string(argv[argvIndex + 1]);
argvIndex++;
}
else
{
MAIN_EXIT_WITH_ERROR("YouTube operational API instance URL missing!")
}
}
else
{
MAIN_EXIT_WITH_ERROR("Unrecognized parameter " << argvStr)
}
}
// The starting set should be written to `STARTING_CHANNELS_SET_FILE_PATH`.
// To resume this algorithm after a shutdown, just restart it after having deleted the last channel folders in `CHANNELS_DIRECTORY` being treated.
// On a restart, `STARTING_CHANNELS_SET_FILE_PATH` is read and every channel not found in `CHANNELS_DIRECTORY` is added to `channelsToTreat*` or `channelsToTreat*` otherwise before continuing, as if `STARTING_CHANNELS_SET_FILE_PATH` was containing a **treated** starting set.
vector<string> startingChannelsSet = getFileContent(STARTING_CHANNELS_SET_FILE_PATH);
for(unsigned int startingChannelsSetIndex = 0; startingChannelsSetIndex < startingChannelsSet.size(); startingChannelsSetIndex++)
{
string startingChannel = startingChannelsSet[startingChannelsSetIndex];
channelsToTreat[startingChannelsSetIndex] = startingChannel;
channelsToTreatRev[startingChannel] = startingChannelsSetIndex;
}
// Load the YouTube Data API v3 keys stored in `YOUTUBE_DATA_API_V3_KEYS_FILE_PATH`.
youtubeDataApiV3keys = getFileContent(YOUTUBE_DATA_API_V3_KEYS_FILE_PATH);
currentYouTubeDataAPIv3Key = youtubeDataApiV3keys[0];
createDirectory(CHANNELS_DIRECTORY);
// Remove already treated channels from channels to treat.
for(const auto& entry : filesystem::directory_iterator(CHANNELS_DIRECTORY))
{
string fileName = entry.path().filename();
// Skip files such as `UNLISTED_VIDEOS_FILE_PATH`.
if (fileName.substr(0, 2) == "UC") {
string channelId = fileName.substr(0, fileName.length() - 4);
channelsToTreat.erase(channelsToTreatRev[channelId]);
channelsToTreatRev.erase(channelId);
channelsAlreadyTreated.insert(channelId);
}
}
// Load at runtime the current working directory.
char cwd[PATH_MAX];
if (getcwd(cwd, sizeof(cwd)) != NULL) {
CURRENT_WORKING_DIRECTORY = string(cwd) + "/";
} else {
MAIN_EXIT_WITH_ERROR("`getcwd()` error");
}
// Print the number of:
// - channels to treat
// - channels already treated
MAIN_PRINT(channelsToTreat.size() << " channel(s) to treat")
MAIN_PRINT(channelsAlreadyTreated.size() << " channel(s) already treated")
// Start the `THREADS_NUMBER` threads.
// Note that there is an additional thread that is the one the `main` function that will continue the code below this `for` loop.
vector<thread> threads;
for(unsigned short threadsIndex = 0; threadsIndex < THREADS_NUMBER; threadsIndex++)
{
threads.push_back(thread(treatChannels, threadsIndex + 1));
}
// Every second print the number of channels found during the last second.
// Note that if a same channel is found multiple times, the count will be incremented the same number of times.
while(true)
{
MAIN_PRINT("Channels treated per second: " << channelsFoundPerSecondCount)
channelsFoundPerSecondCount = 0;
sleep(1);
}
// The following is dead code, as we assume below not to have ever treated completely YouTube.
for(unsigned short threadsIndex = 0; threadsIndex < THREADS_NUMBER; threadsIndex++)
{
threads[threadsIndex].join();
}
return 0;
}
// Function each thread loop in until the whole YouTube graph is completely treated.
void treatChannels(unsigned short threadId)
{
// For the moment we assume that we never have treated completely YouTube, otherwise we have to pay attention how to proceed if the starting set involves startvation for some threads.
while(true)
{
// As we're about to mark a channel as being treated, we need to make sure that no other thread is also modifying the set of channels we are working on.
channelsAlreadyTreatedAndToTreatMutex.lock();
if(channelsToTreat.empty())
{
channelsAlreadyTreatedAndToTreatMutex.unlock();
// Consumer thread waiting producer one to provide a channel to work on.
sleep(1);
continue;
}
// Treat channels in the order we found them in `STARTING_CHANNELS_SET_FILE_PATH` or discovered them.
string channelToTreat = channelsToTreat.begin()->second;
// Print the channel id the thread is going to work on and remind the number of channels already treated and the number of channels to treat.
PRINT("Treating channel " << channelToTreat << " (treated: " << channelsAlreadyTreated.size() << ", to treat: " << channelsToTreat.size() << ")")
channelsTreatedCountThreads[threadId] = 0;
requestsCountThreads[threadId] = 0;
channelsToTreat.erase(channelsToTreatRev[channelToTreat]);
channelsToTreatRev.erase(channelToTreat);
channelsAlreadyTreated.insert(channelToTreat);
channelsAlreadyTreatedAndToTreatMutex.unlock();
// Create directories in which we are going to store the requests to YouTube we did.
string channelToTreatDirectory = CHANNELS_DIRECTORY + channelToTreat + "/";
createDirectory(channelToTreatDirectory);
createDirectory(DEBUG_DIRECTORY);
createDirectory(channelToTreatDirectory + CAPTIONS_DIRECTORY);
createDirectory(channelToTreatDirectory + YOUTUBE_APIS_REQUESTS_DIRECTORY);
// Actually treat the given channel.
treatChannelOrVideo(threadId, true, channelToTreat, channelToTreat);
// Note that compressing the French most subscribers channel took 4 minutes and 42 seconds.
PRINT("Starting compression...")
// As I haven't found any well-known library that compress easily a directory, I have chosen to rely on `zip` cli.
// We precise no `debug`ging, as otherwise the zipping operation doesn't work as expected.
// As the zipping process isn't recursive, we can't just rely on `ls`, but we are obliged to use `find`.
execute(threadId, "cd " + channelToTreatDirectory + " && find | zip ../" + channelToTreat + ".zip -@");
PRINT("Compression finished, started deleting initial directory...")
// Get rid of the uncompressed data.
deleteDirectory(channelToTreatDirectory);
PRINT("Deleting directory finished.")
PRINT(channelsTreatedCountThreads[threadId] << " channels were found for this channel.")
}
// This `unlock` seems to be dead code currently as the algorithm doesn't support treating the whole YouTube graph.
channelsAlreadyTreatedAndToTreatMutex.unlock();
}
// Have to pay attention not to recursively call this function with another channel otherwise we break the ability of the program to halt at any top level channel.
// Note that the `id` can be a channel id or a video id. We provide anyway `channelToTreat` even if it's identical to `id`.
void treatChannelOrVideo(unsigned short threadId, bool isIdAChannelId, string id, string channelToTreat)
{
string pageToken = "";
// Treat all comments:
// - of a given channel thanks to YouTube Data API v3 CommentThreads: list endpoint and `allThreadsRelatedToChannelId` filter if the provided `id` is a channel id
// - of a given video thanks to YouTube Data API v3 CommentThreads: list endpoint and `videoId` filter otherwise (if the provided `id` is a video id)
while(true)
{
ostringstream toString;
toString << "commentThreads?part=snippet,replies&" << (isIdAChannelId ? "allThreadsRelatedToChannelId" : "videoId") << "=" << id << "&maxResults=100&pageToken=" << pageToken;
string url = toString.str();
json data = getJson(threadId, url, true, channelToTreat, pageToken == "" ? normal : retryOnCommentsDisabled);
// This condition doesn't hold for not existing channels.
bool doesRelyingOnCommentThreadsIsEnough = (!isIdAChannelId) || data["error"]["errors"][0]["reason"] != "commentsDisabled";
if(doesRelyingOnCommentThreadsIsEnough)
{
json items = data["items"];
for(const auto& item : items)
{
json comment = item["snippet"]["topLevelComment"];
string commentId = comment["id"];
treatComment(threadId, comment, channelToTreat);
if(item.contains("replies"))
{
// If there is more than 5 replies, they need to be requested by using pagination with YouTube Data API v3 Comments: list endpoint.
// In such case we delay the treatment of the retrieved 5 first replies in order to double treat them.
if(item["snippet"]["totalReplyCount"] > 5)
{
string pageToken = "";
while(true)
{
json data = getJson(threadId, "comments?part=snippet&parentId=" + commentId + "&maxResults=100&pageToken=" + pageToken, true, channelToTreat),
items = data["items"];
for(const auto& item : items)
{
treatComment(threadId, item, channelToTreat);
}
if(data.contains("nextPageToken"))
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
}
else
{
json replies = item["replies"]["comments"];
for(const auto& reply : replies)
{
treatComment(threadId, reply, channelToTreat);
}
}
}
}
if(data.contains("nextPageToken"))
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
else
{
PRINT("Comments disabled channel, treating differently...")
// As far as I know we can't retrieve all videos of a channel if it has more than 20,000 videos, in such case the program stops to investigate this further.
json data = getJson(threadId, "channels?part=statistics&id=" + channelToTreat, true, channelToTreat),
items = data["items"];
if(items.empty())
{
PRINT("The provided channel doesn't exist, skipping it.");
break;
}
// YouTube Data API v3 Videos: list endpoint returns `videoCount` as a string and not an integer...
unsigned int videoCount = atoi(string(items[0]["statistics"]["videoCount"]).c_str());
PRINT("The channel has about " << videoCount << " videos.")
// `UC-3A9g4U1PpLaeAuD4jSP_w` has a `videoCount` of 2, while its `uploads` playlist contains 3 videos. So we use a strict inequality here.
// The `0 < videoCount` is an optimization to avoid making a request to YouTube Data API v3 PlaylistItems: list endpoint while we already know that no results will be returned. As many YouTube channels don't have videos, this optimization is implemented.
if(0 < videoCount && videoCount < 20000)
{
string playlistToTreat = "UU" + channelToTreat.substr(2),
pageToken = "";
while(true)
{
// `snippet` and `status` are unneeded `part`s here but may be interesting later, as we log them.
json data = getJson(threadId, "playlistItems?part=contentDetails,snippet,status&playlistId=" + playlistToTreat + "&maxResults=50&pageToken=" + pageToken, true, channelToTreat, returnErrorIfPlaylistNotFound);
if(data.contains("error"))
{
// This is a sanity check that hasn't ever been violated.
EXIT_WITH_ERROR("Not listing comments on videos, as `playlistItems` hasn't found the `uploads` playlist!")
}
json items = data["items"];
for(const auto& item : items)
{
string videoId = item["contentDetails"]["videoId"];
// To keep the same amount of logs for each regular channel, I comment the following `PRINT`.
//PRINT("Treating video " << videoId)
treatChannelOrVideo(threadId, false, videoId, channelToTreat);
}
if(data.contains("nextPageToken"))
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
break;
}
else if(videoCount == 0)
{
PRINT("Skip listing comments on videos, as they shouldn't be any according to `channels?part=statistics`.")
break;
}
else //if(videoCount >= 20000)
{
EXIT_WITH_ERROR("The videos count of the channel exceeds the supported 20,000 limit!")
}
}
}
// If the provided `id` is a channel id, then we treat its tabs.
if(isIdAChannelId)
{
// Treat the `CHANNELS` tab.
string pageToken = "";
while(true)
{
json data = getJson(threadId, "channels?part=channels&id=" + id + (pageToken == "" ? "" : "&pageToken=" + pageToken), false, id),
// There is no need to verify that the channel exists as it does thanks to previous comments listing.
channelSections = data["items"][0]["channelSections"];
// We don't mind about channel sections, we are only looking for channel ids.
for(const auto& channelSection : channelSections)
{
for(const auto& sectionChannel : channelSection["sectionChannels"])
{
string channelId = sectionChannel["channelId"];
markChannelAsRequiringTreatmentIfNeeded(threadId, channelId);
}
}
// There is a pagination mechanism only when there is a single channel section.
if(channelSections.size() == 1)
{
json channelSection = channelSections[0];
if(!channelSection["nextPageToken"].is_null())
{
pageToken = channelSection["nextPageToken"];
}
else
{
break;
}
}
else
{
break;
}
}
// Treat the `COMMUNITY` tab.
pageToken = "";
while(true)
{
// First we retrieve community post ids then we retrieve their comments and their replies.
json data = getJson(threadId, "channels?part=community&id=" + id + (pageToken == "" ? "" : "&pageToken=" + pageToken), false, id);
data = data["items"][0];
json posts = data["community"];
for(const auto& post : posts)
{
string postId = post["id"];
// As livestreams chats, comments can be filtered as `Top comments` and `Newest first`, from my experience `Top comments` hide some comments, so we use time filtering everywhere it is possible.
json data = getJson(threadId, "community?part=snippet&id=" + postId + "&order=time", false, id);
string pageToken = data["items"][0]["snippet"]["comments"]["nextPageToken"];
while(pageToken != "")
{
json data = getJson(threadId, "commentThreads?part=snippet,replies&pageToken=" + pageToken, false, id),
items = data["items"];
for(const auto& item : items)
{
json snippet = item["snippet"]["topLevelComment"]["snippet"],
authorChannelId = snippet["authorChannelId"];
if(!authorChannelId["value"].is_null())
{
string channelId = authorChannelId["value"];
markChannelAsRequiringTreatmentIfNeeded(threadId, channelId);
}
// Contrarily to YouTube Data API v3 for a given comments having replies, we don't switch from CommentThreads: list endpoint to Comments: list endpoint, here we keep working with YouTube operational API CommentThreads: list endpoint but change the page token.
string pageToken = snippet["nextPageToken"];
while(pageToken != "")
{
json data = getJson(threadId, "commentThreads?part=snippet,replies&pageToken=" + pageToken, false, id),
items = data["items"];
for(const auto& item : items)
{
string channelId = item["snippet"]["authorChannelId"]["value"];
markChannelAsRequiringTreatmentIfNeeded(threadId, channelId);
}
if(data.contains("nextPageToken"))
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
}
if(data.contains("nextPageToken"))
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
}
// See https://github.com/Benjamin-Loison/YouTube-operational-API/issues/49
if(data.contains("nextPageToken") && data["nextPageToken"] != "")
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
// Treat the `PLAYLISTS` tab.
pageToken = "";
while(true)
{
json data = getJson(threadId, "channels?part=playlists&id=" + id + (pageToken == "" ? "" : "&pageToken=" + pageToken), false, id),
playlistSections = data["items"][0]["playlistSections"];
// We don't mind about playlist sections, we are only looking for channel ids.
for(const auto& playlistSection : playlistSections)
{
for(const auto& playlist : playlistSection["playlists"])
{
string playlistId = playlist["id"];
// We exclude shows as they at least for free don't contain any comment indirectly.
if(playlistId.substr(0, 2) == "SC")
{
continue;
}
//PRINT(threadId, playlistId)
string pageToken = "";
while(true)
{
json data = getJson(threadId, "playlistItems?part=contentDetails,snippet,status&playlistId=" + playlistId + "&maxResults=50&pageToken=" + pageToken, true, id),
items = data["items"];
for(const auto& item : items)
{
json snippet = item["snippet"];
// This section is bit out of the scope of the YouTube captions search engine goal, as we are just curious about unlisted videos that we found but in fact it's also a bit in the scope of the initial goal, as this enable us to treat unlisted content.
string privacyStatus = item["status"]["privacyStatus"];
// `5-CXVU8si3A` in `PLTYUE9O6WCrjQsnOm56rMMNmFy_A-SjUx` has its privacy status on `privacyStatusUnspecified` and is inaccessible.
// `GMiVi8xkEXA` in `PLTYUE9O6WCrgNpeSiryP8LYVX-7tOJ1f1` has its privacy status on `private`.
// Of course `commentThreads?videoId=` doesn't work for these videos (same result on YouTube UI).
// By hypothesis that the discovery algorithm never ends we can't postpone the treatment of these unlisted videos, because we can find such unlisted videos at any point in time (before or after the given channel treatment).
// Maybe modifying this hypothesis would make sense, otherwise we have to treat them right-away (note that except code architecture, there is no recursion problem as documented on this function).
if(privacyStatus != "public" && privacyStatus != "private" && snippet["title"] != "Deleted video")
{
string videoId = snippet["resourceId"]["videoId"],
channelId = snippet["videoOwnerChannelId"];
PRINT("Found non public video (" << videoId << ") in: " << playlistId)
string channelUnlistedVideosFilePath = CHANNELS_DIRECTORY + UNLISTED_VIDEOS_FILE_PATH;
bool doesChannelUnlistedVideosFileExist = doesFileExist(channelUnlistedVideosFilePath);
writeFile(threadId, channelUnlistedVideosFilePath, !doesChannelUnlistedVideosFileExist ? "w" : "a", (!doesChannelUnlistedVideosFileExist ? "" : "\n") + channelId);
}
if(snippet.contains("videoOwnerChannelId"))
{
// There isn't any `videoOwnerChannelId` to retrieve for `5-CXVU8si3A` for instance.
string channelId = snippet["videoOwnerChannelId"];
// As we are already treating the given channel, verifying if it needs to be treated again is only a loss of time, so we skip the verification in this case.
if(channelId != id)
{
markChannelAsRequiringTreatmentIfNeeded(threadId, channelId);
}
}
}
if(data.contains("nextPageToken"))
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
}
}
if(!data["nextPageToken"].is_null())
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
// Treat the `LIVE` tab.
pageToken = "";
string playlistId = "UU" + id.substr(2);
vector<string> videoIds;
while(true)
{
// We verify by batch of 50 videos, if they are livestreams or not thanks to YouTube Data API v3 PlaylistItems: list and Videos: list endpoints, as PlaylistItems: list endpoint doesn't provide on its own whether or not the given video is a livestream.
json data = getJson(threadId, "playlistItems?part=contentDetails,snippet,status&playlistId=" + playlistId + "&maxResults=50&pageToken=" + pageToken, true, id, returnErrorIfPlaylistNotFound),
items = data["items"];
for(const auto& item : items)
{
string videoId = item["snippet"]["resourceId"]["videoId"];
videoIds.push_back(videoId);
}
bool hasNextPageToken = data.contains("nextPageToken");
if(videoIds.size() == 50 || !hasNextPageToken)
{
json data = getJson(threadId, "videos?part=contentDetails,id,liveStreamingDetails,localizations,player,snippet,statistics,status,topicDetails&id=" + join(videoIds, ","), true, id),
items = data["items"];
for(const auto& item : items)
{
if(item.contains("liveStreamingDetails"))
{
string videoId = item["id"];
//PRINT(videoId)
json liveStreamingDetails = item["liveStreamingDetails"];
// There is two possibilities for a live stream, whether it's ended or not.
// If it's ended we can't anymore use YouTube Live Streaming API LiveChat/messages: list endpoint.
if(liveStreamingDetails.contains("activeLiveChatId"))
{
string activeLiveChatId = liveStreamingDetails["activeLiveChatId"];
json data = getJson(threadId, "liveChat/messages?part=snippet,authorDetails&liveChatId=" + activeLiveChatId, true, id),
items = data["items"];
for(const auto& item : items)
{
string channelId = item["snippet"]["authorChannelId"];
markChannelAsRequiringTreatmentIfNeeded(threadId, channelId);
}
}
else
{
// As there isn't the usual pagination mechanism for these ended livestreams, we proceed in an uncertain way as follows based on a time pagination.
set<string> messageIds;
unsigned long long lastMessageTimestampRelativeMsec = 0;
while(true)
{
string time = to_string(lastMessageTimestampRelativeMsec);
json data = getJson(threadId, "liveChats?part=snippet&id=" + videoId + "&time=" + time, false, id),
snippet = data["items"][0]["snippet"];
if(snippet.empty())
{
break;
}
json firstMessage = snippet[0];
string firstMessageId = firstMessage["id"];
// We verify that we don't skip any message by verifying that the first message was already treated if we already treated some messages.
if(!messageIds.empty() && messageIds.find(firstMessageId) == messageIds.end())
{
// This sometimes happen cf https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine/issues/39.
PRINT("The verification that we don't skip any message failed! Continuing anyway...")
}
for(const auto& message : snippet)
{
string messageId = message["id"];
if(messageIds.find(messageId) == messageIds.end())
{
messageIds.insert(messageId);
string channelId = message["authorChannelId"];
markChannelAsRequiringTreatmentIfNeeded(threadId, channelId);
}
}
json lastMessage = snippet.back();
// If there isn't any new message, then we stop the retrieving.
if(lastMessageTimestampRelativeMsec == lastMessage["videoOffsetTimeMsec"])
{
break;
}
lastMessageTimestampRelativeMsec = lastMessage["videoOffsetTimeMsec"];
}
}
}
}
videoIds.clear();
}
if(hasNextPageToken)
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
// Captions retrieval by relying on `yt-dlp` after having listed all videos ids of the given channel.
string playlistToTreat = "UU" + channelToTreat.substr(2);
pageToken = "";
while(true)
{
json data = getJson(threadId, "playlistItems?part=snippet,contentDetails,status&playlistId=" + playlistToTreat + "&maxResults=50&pageToken=" + pageToken, true, channelToTreat, returnErrorIfPlaylistNotFound);
if(data.contains("error"))
{
// `UCFoBM1VginhMH7lR56GtVbQ` doesn't have videos and is in this case for instance.
PRINT("Not listing captions on videos, as `playlistItems` hasn't found the `uploads` playlist!")
break;
}
json items = data["items"];
for(const auto& item : items)
{
string videoId = item["contentDetails"]["videoId"];
// Could proceed as follows by verifying `!isIdAChannelId` but as we don't know how to manage unlisted videos, we don't proceed this way.
//treatChannelOrVideo(threadId, false, videoId, channelToTreat);
string channelCaptionsToTreatDirectory = CHANNELS_DIRECTORY + channelToTreat + "/" + CAPTIONS_DIRECTORY + videoId + "/";
createDirectory(channelCaptionsToTreatDirectory);
// Firstly download all not automatically generated captions.
// The underscore in `-o` argument is used to not end up with hidden files.
// We are obliged to precise the video id after `--`, otherwise if the video id starts with `-` it's considered as an argument.
string commandCommonPrefix = "yt-dlp --skip-download ",
commandCommonPostfix = " -o '" + channelCaptionsToTreatDirectory + "_' -- " + videoId;
string command = commandCommonPrefix + "--write-sub --sub-lang all,-live_chat" + commandCommonPostfix;
execute(threadId, command);
// Secondly download the automatically generated captions.
command = commandCommonPrefix + "--write-auto-subs --sub-langs '.*orig' --sub-format ttml --convert-subs vtt" + commandCommonPostfix;
execute(threadId, command);
}
if(data.contains("nextPageToken"))
{
pageToken = data["nextPageToken"];
}
else
{
break;
}
}
}
}
// This function verifies that the given channel hasn't already been treated.
void markChannelAsRequiringTreatmentIfNeeded(unsigned short threadId, string channelId)
{
channelsFoundPerSecondCount++;
channelsTreatedCountThreads[threadId]++;
// As other threads may be writing the sets we are reading, we need to make sure it's not the case to ensure consistency.
channelsAlreadyTreatedAndToTreatMutex.lock();
if(channelsAlreadyTreated.find(channelId) == channelsAlreadyTreated.end() && channelsToTreatRev.find(channelId) == channelsToTreatRev.end())
{
// It is unclear to me why `channelsToTreat.end()->first + 1` doesn't work here.
unsigned int channelsToTreatIndex = !channelsToTreat.empty() ? channelsToTreat.rbegin()->first + 1 : channelsAlreadyTreated.size();
channelsToTreat[channelsToTreatIndex] = channelId;
channelsToTreatRev[channelId] = channelsToTreatIndex;
channelsAlreadyTreatedAndToTreatMutex.unlock();
writeFile(threadId, STARTING_CHANNELS_SET_FILE_PATH, "a", "\n" + channelId);
}
else
{
channelsAlreadyTreatedAndToTreatMutex.unlock();
}
}
// Mark the comment author channel as requiring treatment if needed.
void treatComment(unsigned short threadId, json comment, string channelId)
{
json snippet = comment["snippet"];
// The `else` case can happen (cf `95a9421ad0469a09335afeddb2983e31dc00bc36`).
if(snippet.contains("authorChannelId"))
{
string channelId = snippet["authorChannelId"]["value"];
markChannelAsRequiringTreatmentIfNeeded(threadId, channelId);
}
}
// Join `parts` with the `delimiter`.
string join(vector<string> parts, string delimiter)
{
string result = "";
unsigned int partsSize = parts.size();
for(unsigned int partsIndex = 0; partsIndex < partsSize; partsIndex++)
{
result += parts[partsIndex];
if(partsIndex < partsSize - 1)
{
result += delimiter;
}
}
return result;
}
// Execute a provide command as if being ran in a shell.
// This is mandatory as as far as I know there isn't a C++ API for `yt-dlp`.
void execute(unsigned short threadId, string command, bool debug)
{
// The debugging gives us confidence that `yt-dlp` is working as expected, cf https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine/issues/35#issuecomment-578.
if(debug)
{
ostringstream toString;
toString << threadId;
string initialCommand = command,
threadIdStr = toString.str(),
debugCommonFilePath = CURRENT_WORKING_DIRECTORY + DEBUG_DIRECTORY + threadIdStr,
debugOutFilePath = debugCommonFilePath + ".out",
debugErrFilePath = debugCommonFilePath + ".err";
command += " >> " + debugOutFilePath;
command += " 2>> " + debugErrFilePath;
writeFile(threadId, debugOutFilePath, "a", initialCommand + "\n");
writeFile(threadId, debugErrFilePath, "a", initialCommand + "\n");
}
system(command.c_str());
}
bool writeFile(unsigned short threadId, string filePath, string option, string toWrite)
{
FILE* file = fopen(filePath.c_str(), option.c_str());
if(file != NULL)
{
fputs(toWrite.c_str(), file);
fclose(file);
return true;
}
else
{
PRINT("writeFile error: " << strerror(errno))
}
return false;
}
bool doesFileExist(string filePath)
{
struct stat buffer;
return stat(filePath.c_str(), &buffer) == 0;
}
// Create a directory in the case that it isn't already existing.
void createDirectory(string path)
{
mkdir(path.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
}
// Delete a directory even if it's not empty.
void deleteDirectory(string path)
{
filesystem::remove_all(path);
}
// Get date in `%d-%m-%Y %H-%M-%S.%MS` format.
// Return for instance `22-02-2023 00-43-24.602`.
string getDate()
{
auto t = time(nullptr);
auto tm = *localtime(&t);
ostringstream toString;
toString << put_time(&tm, "%d-%m-%Y %H-%M-%S.");
milliseconds ms = duration_cast<milliseconds>(
system_clock::now().time_since_epoch()
);
toString << (ms.count() % 1000);
return toString.str();
}
// Return file lines as a vector of the file at the given `filePath`.
vector<string> getFileContent(string filePath)
{
vector<string> lines;
ifstream infile(filePath.c_str());
string line;
while(getline(infile, line))
lines.push_back(line);
return lines;
}
// Execute and return the result of a given request to a YouTube API.
json getJson(unsigned short threadId, string url, bool usingYoutubeDataApiv3, string channelId, getJsonBehavior behavior)
{
// If using the YouTube operational API official instance no-key service, we don't need to provide any YouTube Data API v3 key.
string finalUrl = usingYoutubeDataApiv3 ?
(USE_YT_LEMNOSLIFE_COM_NO_KEY_SERVICE ?
"https://yt.lemnoslife.com/noKey/" + url :
"https://www.googleapis.com/youtube/v3/" + url + "&key=" + currentYouTubeDataAPIv3Key) :
YOUTUBE_OPERATIONAL_API_INSTANCE_URL + "/" + url,
content = getHttps(finalUrl);
json data;
try
{
data = json::parse(content);
}
catch (json::parse_error& ex)
{
// From the experience this sometimes happens due to empty `content` but retrying just after solves the problem.
PRINT("Parse error for " << finalUrl << ", as got: " << content << " ! Retrying...")
return getJson(threadId, url, usingYoutubeDataApiv3, channelId);
}
if(data.contains("error"))
{
// The YouTube operational API shouldn't be returning any error, if it's the case we stop the execution to investigate the problem.
if(!usingYoutubeDataApiv3)
{
EXIT_WITH_ERROR("Found error in JSON retrieved from YouTube operational API at URL: " << finalUrl << " for content: " << content << " !")
}
string reason = data["error"]["errors"][0]["reason"];
// Contrarily to YouTube operational API no-key service we don't rotate keys in `YOUTUBE_DATA_API_V3_KEYS_FILE_PATH`, as we keep them in memory here, but we do rotate them in the memory.
if(reason == "quotaExceeded")
{
quotaMutex.lock();
// Move the current exhausted YouTube Data API v3 key from the first slot to the last one.
youtubeDataApiV3keys.erase(youtubeDataApiV3keys.begin());
youtubeDataApiV3keys.push_back(currentYouTubeDataAPIv3Key);
PRINT("No more quota on " << currentYouTubeDataAPIv3Key << " switching to " << youtubeDataApiV3keys[0] << ".")
currentYouTubeDataAPIv3Key = youtubeDataApiV3keys[0];
quotaMutex.unlock();
// We proceed again to the request not to return a temporary error due to our keys management.
return getJson(threadId, url, true, channelId);
}
// Errors from YouTube Data API v3 are normal in some cases when we request something that doesn't exist such as comments of a channel on a channel that doesn't have any, but we have to make the request to know that it doesn't have any that's why we proceed this way.
PRINT("Found error in JSON at URL: " << finalUrl << " for content: " << content << " !")
if(reason != "commentsDisabled" || behavior == retryOnCommentsDisabled)
{
return reason == "playlistNotFound" && behavior == returnErrorIfPlaylistNotFound ? data : getJson(threadId, url, true, channelId);
}
}
// Write the request URL and the retrieved content to logs.
ostringstream toString;
toString << CHANNELS_DIRECTORY << channelId << "/" << YOUTUBE_APIS_REQUESTS_DIRECTORY;
writeFile(threadId, toString.str() + "urls.txt", "a", url + " " + (usingYoutubeDataApiv3 ? "true" : "false") + "\n");
toString << requestsCountThreads[threadId]++ << ".json";
writeFile(threadId, toString.str(), "w", content);
return data;
}
void print(ostringstream* toPrint)
{
printMutex.lock();
cout << getDate() << ": " << toPrint->str() << endl;
toPrint->str("");
printMutex.unlock();
}
// Is this function really multi-threading friendly? If not, could consider executing `curl` using the command line.
// Retrieves content from an URL. Note that this function verifies the validity of the certificate in case of HTTPS.
string getHttps(string url)
{
CURL* curl = curl_easy_init();
string got;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 1);
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 1);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeCallback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &got);
curl_easy_perform(curl);
curl_easy_cleanup(curl);
return got;
}
// Auxiliary function required by `getHttps` function.
size_t writeCallback(void* contents, size_t size, size_t nmemb, void* userp)
{
((string*)userp)->append((char*)contents, size * nmemb);
return size * nmemb;
}