167 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
			
		
		
	
	
			167 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
| <?php
 | |
| 
 | |
| use Ratchet\MessageComponentInterface;
 | |
| use Ratchet\ConnectionInterface;
 | |
| use React\EventLoop\LoopInterface;
 | |
| use React\EventLoop\Timer\Timer;
 | |
| 
 | |
| // Make sure composer dependencies have been installed
 | |
| require __DIR__ . '/vendor/autoload.php';
 | |
| 
 | |
| class Client
 | |
| {
 | |
|     public $id;
 | |
|     public $timer;
 | |
|     public $pid;
 | |
| 
 | |
|     public function __construct($id)
 | |
|     {
 | |
|         $this->id = $id;
 | |
|     }
 | |
| 
 | |
|     // `__destruct` can't take arguments.
 | |
|     public function free($loop)
 | |
|     {
 | |
|         $loop->cancelTimer($this->timer);
 | |
|         // Should in theory verify that the pid wasn't re-assigned.
 | |
|         posix_kill($this->pid, SIGTERM);
 | |
|         $clientFilePath = getClientFilePath($this->id);
 | |
|         if (file_exists($clientFilePath)) {
 | |
|             $fp = fopen($clientFilePath, "r+");
 | |
|             if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) {   // acquire an exclusive lock
 | |
|                 unlink($clientFilePath); // delete file
 | |
|                 flock($fp, LOCK_UN);     // release the lock
 | |
|             } else {
 | |
|                 echo "Couldn't get the lock!";
 | |
|             }
 | |
|             fclose($fp);
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| // Need to be passed as a reference to `flock`.
 | |
| $WAIT_IF_LOCKED = 1;
 | |
| 
 | |
| define('USERS_FOLDER', 'users/');
 | |
| 
 | |
| // Delete users outputs of previous `websocket.php` execution.
 | |
| // We skip `.`, `..` and `.gitignore`.
 | |
| foreach (array_slice(scandir(USERS_FOLDER), 3) as $file) {
 | |
|     unlink(USERS_FOLDER . $file);
 | |
| }
 | |
| 
 | |
| function getClientFilePath($clientId)
 | |
| {
 | |
|     return USERS_FOLDER . "$clientId.txt";
 | |
| }
 | |
| 
 | |
| // Current implementation may add latency across users.
 | |
| class MyProcess implements MessageComponentInterface
 | |
| {
 | |
|     protected $clients;
 | |
|     private $loop;
 | |
|     private $newClientId;
 | |
|     private $newClientIdSem;
 | |
| 
 | |
|     public function __construct(LoopInterface $loop)
 | |
|     {
 | |
|         $this->clients = new \SplObjectStorage();
 | |
|         $this->loop = $loop;
 | |
|         $this->newClientId = 0;
 | |
|         $this->newClientIdSem = sem_get(1, 1);
 | |
|     }
 | |
| 
 | |
|     private function newClient()
 | |
|     {
 | |
|         // If `onOpen` and `onMessage` can't be called at the same time, then this semaphore is useless.
 | |
|         if (sem_acquire($this->newClientIdSem)) {
 | |
|             // Note that we don't re-use ids except on `websockets.php` restart, but as the maximal int in PHP is a very great number we are fine for a while (https://www.php.net/manual/en/reserved.constants.php#constant.php-int-max)
 | |
|             $clientId = $this->newClientId++;
 | |
|             sem_release($this->newClientIdSem);
 | |
|             return new Client($clientId);
 | |
|         } else {
 | |
|             exit('`newClient` error');
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     public function onOpen(ConnectionInterface $conn)
 | |
|     {
 | |
|         $client = $this->newClient();
 | |
|         $this->clients->attach($conn, $client);
 | |
|     }
 | |
| 
 | |
|     public function onMessage(ConnectionInterface $from, $msg)
 | |
|     {
 | |
|         // As we are going to use this argument in a shell command, we verify a limited set of characters that are safe once quoted.
 | |
|         if (preg_match("/^[a-zA-Z0-9-_ ]+$/", $msg) !== 1) {
 | |
|             return;
 | |
|         }
 | |
|         $client = $this->clients->offsetGet($from);
 | |
|         // If a previous request was received, we execute the new one with another client for simplicity otherwise with current file deletion approach, we can't tell the worker `search.py` that we don't care about its execution anymore.
 | |
|         if ($client->pid !== null) {
 | |
|             // As `$this->clients->detach` doesn't call `__destruct` for unknown reason, we clean manually the previous request.
 | |
|             $client->free($this->loop);
 | |
|             $client = $this->newClient();
 | |
|         }
 | |
|         $clientId = $client->id;
 | |
|         $clientFilePath = getClientFilePath($clientId);
 | |
|         // Create the worker output file otherwise it would believe that we don't need this worker anymore.
 | |
|         file_put_contents($clientFilePath, '');
 | |
|         // Start the independent worker.
 | |
|         // Redirecting `stdout` is mandatory otherwise `exec` is blocking.
 | |
|         $client->pid = exec("./search.py $clientId '$msg' > /dev/null & echo $!");
 | |
|         // `addTimer` doesn't enable us to use independently `$from->send` multiple times with blocking instructions between.
 | |
|         $client->timer = $this->loop->addPeriodicTimer(1, function () use ($from, $clientId, $clientFilePath, $client) {
 | |
|             echo "Checking news from $clientId\n";
 | |
|             // If the worker output file doesn't exist anymore, then it means that the worker have finished its work and acknowledged that `websocket.php` completely read its output.
 | |
|             if (file_exists($clientFilePath)) {
 | |
|                 // `flock` requires `r`eading permission and we need `w`riting one due to `ftruncate` usage.
 | |
|                 $fp = fopen($clientFilePath, "r+");
 | |
|                 $read = null;
 | |
|                 if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock
 | |
|                     // We assume that the temporary output is less than 1 MB long.
 | |
|                     $read = fread($fp, 1_000_000);
 | |
|                     ftruncate($fp, 0);     // truncate file
 | |
|                     fflush($fp);           // flush output before releasing the lock
 | |
|                     flock($fp, LOCK_UN);   // release the lock
 | |
|                 } else {
 | |
|                     // We `die` instead of `echo`ing to force the developer to investigate the reason.
 | |
|                     die("Couldn't get the lock!");
 | |
|                 }
 | |
|                 fclose($fp);
 | |
| 
 | |
|                 // Assume that empty output doesn't need to me forwarded to the end-user.
 | |
|                 if ($read !== null && $read !== '') {
 | |
|                     $from->send($read);
 | |
|                 }
 | |
|             } else {
 | |
|                 // We don't need the periodic timer anymore, as the worker finished its work and acknowledged that `websocket.php` completely read its output.
 | |
|                 $this->loop->cancelTimer($client->timer);
 | |
|             }
 | |
|         });
 | |
|     }
 | |
| 
 | |
|     public function onClose(ConnectionInterface $conn)
 | |
|     {
 | |
|         $client = $this->clients->offsetGet($conn);
 | |
|         $clientId = $client->id;
 | |
|         $client->free($this->loop);
 | |
|         echo "$clientId disconnected\n";
 | |
|         $this->clients->detach($conn);
 | |
|     }
 | |
| 
 | |
|     public function onError(ConnectionInterface $conn, \Exception $e)
 | |
|     {
 | |
|         $conn->close();
 | |
|         die('`onError`');
 | |
|     }
 | |
| }
 | |
| 
 | |
| $loop = \React\EventLoop\Factory::create();
 | |
| 
 | |
| // Run the server application through the WebSocket protocol on port 4430.
 | |
| // Note that named arguments come with PHP 8 which isn't current Debian one.
 | |
| $app = new Ratchet\App('crawler.yt.lemnoslife.com', 4430, '127.0.0.1', $loop);
 | |
| $app->route('/websocket', new MyProcess($loop), array('*'));
 | |
| $app->run();
 |