id = $id; } // `__destruct` can't take arguments. public function free($loop) { $loop->cancelTimer($this->timer); // Should in theory verify that the pid wasn't re-assigned. posix_kill($this->pid, SIGTERM); $clientFilePath = getClientFilePath($this->id); if (file_exists($clientFilePath)) { $fp = fopen($clientFilePath, "r+"); if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock unlink($clientFilePath); // delete file flock($fp, LOCK_UN); // release the lock } else { echo "Couldn't get the lock!"; } fclose($fp); } } } // Need to be passed as a reference to `flock`. $WAIT_IF_LOCKED = 1; define('USERS_FOLDER', 'users/'); // Delete users outputs of previous `websocket.php` execution. // We skip `.`, `..` and `.gitignore`. foreach (array_slice(scandir(USERS_FOLDER), 3) as $file) { unlink(USERS_FOLDER . $file); } function getClientFilePath($clientId) { return USERS_FOLDER . "$clientId.txt"; } // Current implementation may add latency across users. class MyProcess implements MessageComponentInterface { protected $clients; private $loop; private $newClientId; private $newClientIdSem; public function __construct(LoopInterface $loop) { $this->clients = new \SplObjectStorage(); $this->loop = $loop; $this->newClientId = 0; $this->newClientIdSem = sem_get(1, 1); } private function newClient() { // If `onOpen` and `onMessage` can't be called at the same time, then this semaphore is useless. if (sem_acquire($this->newClientIdSem)) { // Note that we don't re-use ids except on `websockets.php` restart, but as the maximal int in PHP is a very great number we are fine for a while (https://www.php.net/manual/en/reserved.constants.php#constant.php-int-max) $clientId = $this->newClientId++; sem_release($this->newClientIdSem); return new Client($clientId); } else { exit('`newClient` error'); } } public function onOpen(ConnectionInterface $conn) { $client = $this->newClient(); $this->clients->attach($conn, $client); } public function onMessage(ConnectionInterface $from, $msg) { // As we are going to use this argument in a shell command, we verify a limited set of characters that are safe once quoted. if (preg_match("/^[a-zA-Z0-9-_ ]+$/", $msg) !== 1) { return; } $client = $this->clients->offsetGet($from); // If a previous request was received, we execute the new one with another client for simplicity otherwise with current file deletion approach, we can't tell the worker `search.py` that we don't care about its execution anymore. if ($client->pid !== null) { // As `$this->clients->detach` doesn't call `__destruct` for unknown reason, we clean manually the previous request. $client->free($this->loop); $client = $this->newClient(); } $clientId = $client->id; $clientFilePath = getClientFilePath($clientId); // Create the worker output file otherwise it would believe that we don't need this worker anymore. file_put_contents($clientFilePath, ''); // Start the independent worker. // Redirecting `stdout` is mandatory otherwise `exec` is blocking. $client->pid = exec("./search.py $clientId '$msg' > /dev/null & echo $!"); // `addTimer` doesn't enable us to use independently `$from->send` multiple times with blocking instructions between. $client->timer = $this->loop->addPeriodicTimer(1, function () use ($from, $clientId, $clientFilePath, $client) { echo "Checking news from $clientId\n"; // If the worker output file doesn't exist anymore, then it means that the worker have finished its work and acknowledged that `websocket.php` completely read its output. if (file_exists($clientFilePath)) { // `flock` requires `r`eading permission and we need `w`riting one due to `ftruncate` usage. $fp = fopen($clientFilePath, "r+"); $read = null; if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock // We assume that the temporary output is less than 1 MB long. $read = fread($fp, 1_000_000); ftruncate($fp, 0); // truncate file fflush($fp); // flush output before releasing the lock flock($fp, LOCK_UN); // release the lock } else { // We `die` instead of `echo`ing to force the developer to investigate the reason. die("Couldn't get the lock!"); } fclose($fp); // Assume that empty output doesn't need to me forwarded to the end-user. if ($read !== null && $read !== '') { $from->send($read); } } else { // We don't need the periodic timer anymore, as the worker finished its work and acknowledged that `websocket.php` completely read its output. $this->loop->cancelTimer($client->timer); } }); } public function onClose(ConnectionInterface $conn) { $client = $this->clients->offsetGet($conn); $clientId = $client->id; $client->free($this->loop); echo "$clientId disconnected\n"; $this->clients->detach($conn); } public function onError(ConnectionInterface $conn, \Exception $e) { $conn->close(); die('`onError`'); } } $loop = \React\EventLoop\Factory::create(); // Run the server application through the WebSocket protocol on port 4430. // Note that named arguments come with PHP 8 which isn't current Debian one. $app = new Ratchet\App('crawler.yt.lemnoslife.com', 4430, '127.0.0.1', $loop); $app->route('/websocket', new MyProcess($loop), array('*')); $app->run();