From 82e597f20502e1fe8f494c30476a4035c9e00a28 Mon Sep 17 00:00:00 2001 From: Benjamin Loison Date: Tue, 7 Feb 2023 18:14:49 +0100 Subject: [PATCH] Comment WebSocket mechanism to work with an arbitrary number of independent send --- website/index.php | 5 ++++- website/search.py | 25 +++++++++++++++++-------- website/websocket.php | 43 ++++++++++++++++++++++++------------------- 3 files changed, 45 insertions(+), 28 deletions(-) diff --git a/website/index.php b/website/index.php index 15bbee9..3a5dba1 100644 --- a/website/index.php +++ b/website/index.php @@ -10,7 +10,7 @@ See for more information.
- +
@@ -19,14 +19,17 @@ See id = $id; } + // `__destruct` can't take arguments. public function free($loop) { $loop->cancelTimer($this->timer); @@ -38,10 +39,13 @@ class Client } } +// Need to be passed as a reference to `flock`. $WAIT_IF_LOCKED = 1; define('USERS_FOLDER', 'users/'); +// Delete users outputs of previous `websocket.php` execution. +// We skip `.`, `..` and `.gitignore`. foreach (array_slice(scandir(USERS_FOLDER), 3) as $file) { unlink(USERS_FOLDER . $file); } @@ -69,12 +73,14 @@ class MyProcess implements MessageComponentInterface private function newClient() { + // If `onOpen` and `onMessage` can't be called at the same time, then this semaphore is useless. if (sem_acquire($this->newClientIdSem)) { + // Note that we don't re-use ids except on `websockets.php` restart, but as the maximal int in PHP is a very great number we are fine for a while (https://www.php.net/manual/en/reserved.constants.php#constant.php-int-max) $clientId = $this->newClientId++; sem_release($this->newClientIdSem); return new Client($clientId); } else { - exit('`onOpen` error'); + exit('`newClient` error'); } } @@ -86,38 +92,50 @@ class MyProcess implements MessageComponentInterface public function onMessage(ConnectionInterface $from, $msg) { + // As we are going to use this argument in a shell command, we verify a limited set of characters that are safe once quoted. if (preg_match("/^[a-zA-Z0-9-_ ]+$/", $msg) !== 1) { return; } $client = $this->clients->offsetGet($from); - // If a previous request was received, we execute the new one with another client for simplicity. + // If a previous request was received, we execute the new one with another client for simplicity otherwise with current file deletion approach, we can't tell the worker `search.py` that we don't care about its execution anymore. if ($client->pid !== null) { + // As `$this->clients->detach` doesn't call `__destruct` for unknown reason, we clean manually the previous request. $client->free($this->loop); $client = $this->newClient(); } $clientId = $client->id; $clientFilePath = getClientFilePath($clientId); + // Create the worker output file otherwise it would believe that we don't need this worker anymore. file_put_contents($clientFilePath, ''); + // Start the independent worker. + // Redirecting `stdout` is mandatory otherwise `exec` is blocking. $client->pid = exec("./search.py $clientId '$msg' > /dev/null & echo $!"); + // `addTimer` doesn't enable us to use independently `$from->send` multiple times with blocking instructions between. $client->timer = $this->loop->addPeriodicTimer(1, function () use ($from, $clientId, $clientFilePath, $client) { echo "Checking news from $clientId\n"; + // If the worker output file doesn't exist anymore, then it means that the worker have finished its work and acknowledged that `websocket.php` completely read its output. if (file_exists($clientFilePath)) { + // `flock` requires `r`eading permission and we need `w`riting one due to `ftruncate` usage. $fp = fopen($clientFilePath, "r+"); $read = null; if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock + // We assume that the temporary output is less than 1 MB long. $read = fread($fp, 1_000_000); ftruncate($fp, 0); // truncate file fflush($fp); // flush output before releasing the lock flock($fp, LOCK_UN); // release the lock } else { - echo "Couldn't get the lock!"; + // We `die` instead of `echo`ing to force the developer to investigate the reason. + die("Couldn't get the lock!"); } fclose($fp); + // Assume that empty output doesn't need to me forwarded to the end-user. if ($read !== null && $read !== '') { $from->send($read); } } else { + // We don't need the periodic timer anymore, as the worker finished its work and acknowledged that `websocket.php` completely read its output. $this->loop->cancelTimer($client->timer); } }); @@ -129,33 +147,20 @@ class MyProcess implements MessageComponentInterface $clientId = $client->id; $client->free($this->loop); echo "$clientId disconnected\n"; - /*$this->loop->cancelTimer($client->timer); - // Should in theory verify that the pid wasn't re-assigned. - posix_kill($client->pid, SIGTERM); - $clientFilePath = getClientFilePath($clientId); - if (file_exists($clientFilePath)) { - $fp = fopen($clientFilePath, "r+"); - if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock - unlink($clientFilePath); // delete file - flock($fp, LOCK_UN); // release the lock - } else { - echo "Couldn't get the lock!"; - } - fclose($fp); - }*/ $this->clients->detach($conn); } public function onError(ConnectionInterface $conn, \Exception $e) { - echo '`onError`'; $conn->close(); + die('`onError`'); } } $loop = \React\EventLoop\Factory::create(); -// Run the server application through the WebSocket protocol on port 4430 +// Run the server application through the WebSocket protocol on port 4430. +// Note that named arguments come with PHP 8 which isn't current Debian one. $app = new Ratchet\App('crawler.yt.lemnoslife.com', 4430, '127.0.0.1', $loop); $app->route('/websocket', new MyProcess($loop), array('*')); $app->run();