From 03c2566a2041c875074472407d232e99a168ca55 Mon Sep 17 00:00:00 2001 From: Benjamin Loison Date: Tue, 7 Feb 2023 17:25:17 +0100 Subject: [PATCH] Make WebSocket able to manage arbitrary feedback to end-user While previous implementation was able to send two independent messages, now we can send an arbitrary amount of independent messages. --- website/index.php | 27 +++++++-- website/search.py | 28 +++++++++ website/users/.gitignore | 5 ++ website/websocket.php | 126 +++++++++++++++++++++++++++++++++++---- 4 files changed, 171 insertions(+), 15 deletions(-) create mode 100755 website/search.py create mode 100644 website/users/.gitignore diff --git a/website/index.php b/website/index.php index acb3d19..15bbee9 100644 --- a/website/index.php +++ b/website/index.php @@ -9,11 +9,28 @@ See for more information.
- - +
+ + +
diff --git a/website/search.py b/website/search.py new file mode 100755 index 0000000..0b0be14 --- /dev/null +++ b/website/search.py @@ -0,0 +1,28 @@ +#!/usr/bin/python3 + +import sys, time, fcntl, os + +clientId = sys.argv[1] +message = sys.argv[2] + +clientFilePath = f'users/{clientId}.txt' + +def write(s): + f = open(clientFilePath, 'w+') + try: + fcntl.flock(f, fcntl.LOCK_EX) + if f.read() == '': + f.write(s) + else: + f.close() + time.sleep(1) + write(s) + except Exception as e: + print(e) + f.close() + +for i in range(10): + write(f'{i}: {message}') + time.sleep(2) + +os.remove(clientFilePath) diff --git a/website/users/.gitignore b/website/users/.gitignore new file mode 100644 index 0000000..76bedae --- /dev/null +++ b/website/users/.gitignore @@ -0,0 +1,5 @@ +# Ignore everything in this directory +* +# Except this file +!.gitignore + diff --git a/website/websocket.php b/website/websocket.php index 26bd521..b1dee13 100644 --- a/website/websocket.php +++ b/website/websocket.php @@ -8,41 +8,147 @@ use React\EventLoop\Timer\Timer; // Make sure composer dependencies have been installed require __DIR__ . '/vendor/autoload.php'; -/** - * Send any incoming messages to all connected clients - */ -class MyChat implements MessageComponentInterface +class Client +{ + public $id; + public $timer; + public $pid; + + public function __construct($id) + { + $this->id = $id; + } + + public function free($loop) + { + $loop->cancelTimer($this->timer); + // Should in theory verify that the pid wasn't re-assigned. + posix_kill($this->pid, SIGTERM); + $clientFilePath = getClientFilePath($this->id); + if (file_exists($clientFilePath)) { + $fp = fopen($clientFilePath, "r+"); + if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock + unlink($clientFilePath); // delete file + flock($fp, LOCK_UN); // release the lock + } else { + echo "Couldn't get the lock!"; + } + fclose($fp); + } + } +} + +$WAIT_IF_LOCKED = 1; + +define('USERS_FOLDER', 'users/'); + +foreach (array_slice(scandir(USERS_FOLDER), 3) as $file) { + unlink(USERS_FOLDER . $file); +} + +function getClientFilePath($clientId) +{ + return USERS_FOLDER . "$clientId.txt"; +} + +// Current implementation may add latency across users. +class MyProcess implements MessageComponentInterface { protected $clients; private $loop; + private $newClientId; + private $newClientIdSem; public function __construct(LoopInterface $loop) { $this->clients = new \SplObjectStorage(); $this->loop = $loop; + $this->newClientId = 0; + $this->newClientIdSem = sem_get(1, 1); + } + + private function newClient() + { + if (sem_acquire($this->newClientIdSem)) { + $clientId = $this->newClientId++; + sem_release($this->newClientIdSem); + return new Client($clientId); + } else { + exit('`onOpen` error'); + } } public function onOpen(ConnectionInterface $conn) { - $this->clients->attach($conn); + $client = $this->newClient(); + $this->clients->attach($conn, $client); } public function onMessage(ConnectionInterface $from, $msg) { - $from->send($msg); - $this->loop->addTimer(Timer::MIN_INTERVAL * 10, function() use ($from) { - sleep(5); - $from->send('Proceeded!'); + if (preg_match("/^[a-zA-Z0-9-_ ]+$/", $msg) !== 1) { + return; + } + $client = $this->clients->offsetGet($from); + // If a previous request was received, we execute the new one with another client for simplicity. + if ($client->pid !== null) { + $client->free($this->loop); + $client = $this->newClient(); + } + $clientId = $client->id; + $clientFilePath = getClientFilePath($clientId); + file_put_contents($clientFilePath, ''); + $client->pid = exec("./search.py $clientId '$msg' > /dev/null & echo $!"); + $client->timer = $this->loop->addPeriodicTimer(1, function () use ($from, $clientId, $clientFilePath, $client) { + echo "Checking news from $clientId\n"; + if (file_exists($clientFilePath)) { + $fp = fopen($clientFilePath, "r+"); + $read = null; + if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock + $read = fread($fp, 1_000_000); + ftruncate($fp, 0); // truncate file + fflush($fp); // flush output before releasing the lock + flock($fp, LOCK_UN); // release the lock + } else { + echo "Couldn't get the lock!"; + } + fclose($fp); + + if ($read !== null && $read !== '') { + $from->send($read); + } + } else { + $this->loop->cancelTimer($client->timer); + } }); } public function onClose(ConnectionInterface $conn) { + $client = $this->clients->offsetGet($conn); + $clientId = $client->id; + $client->free($this->loop); + echo "$clientId disconnected\n"; + /*$this->loop->cancelTimer($client->timer); + // Should in theory verify that the pid wasn't re-assigned. + posix_kill($client->pid, SIGTERM); + $clientFilePath = getClientFilePath($clientId); + if (file_exists($clientFilePath)) { + $fp = fopen($clientFilePath, "r+"); + if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock + unlink($clientFilePath); // delete file + flock($fp, LOCK_UN); // release the lock + } else { + echo "Couldn't get the lock!"; + } + fclose($fp); + }*/ $this->clients->detach($conn); } public function onError(ConnectionInterface $conn, \Exception $e) { + echo '`onError`'; $conn->close(); } } @@ -51,5 +157,5 @@ $loop = \React\EventLoop\Factory::create(); // Run the server application through the WebSocket protocol on port 4430 $app = new Ratchet\App('crawler.yt.lemnoslife.com', 4430, '127.0.0.1', $loop); -$app->route('/websocket', new MyChat($loop), array('*')); +$app->route('/websocket', new MyProcess($loop), array('*')); $app->run();