2023-01-30 22:19:04 +01:00
< ? php
2023-01-31 00:57:06 +01:00
2023-01-30 22:19:04 +01:00
use Ratchet\MessageComponentInterface ;
use Ratchet\ConnectionInterface ;
2023-02-07 01:22:26 +01:00
use React\EventLoop\LoopInterface ;
use React\EventLoop\Timer\Timer ;
2023-01-30 22:19:04 +01:00
2023-01-31 00:57:06 +01:00
// Make sure composer dependencies have been installed
require __DIR__ . '/vendor/autoload.php' ;
2023-01-30 22:19:04 +01:00
2023-02-07 17:25:17 +01:00
class Client
{
public $id ;
public $timer ;
public $pid ;
public function __construct ( $id )
{
$this -> id = $id ;
}
2023-02-07 18:14:49 +01:00
// `__destruct` can't take arguments.
2023-02-07 17:25:17 +01:00
public function free ( $loop )
{
$loop -> cancelTimer ( $this -> timer );
// Should in theory verify that the pid wasn't re-assigned.
posix_kill ( $this -> pid , SIGTERM );
$clientFilePath = getClientFilePath ( $this -> id );
if ( file_exists ( $clientFilePath )) {
$fp = fopen ( $clientFilePath , " r+ " );
if ( flock ( $fp , LOCK_EX , $WAIT_IF_LOCKED )) { // acquire an exclusive lock
unlink ( $clientFilePath ); // delete file
flock ( $fp , LOCK_UN ); // release the lock
} else {
echo " Couldn't get the lock! " ;
}
fclose ( $fp );
}
}
}
2023-02-07 18:14:49 +01:00
// Need to be passed as a reference to `flock`.
2023-02-07 17:25:17 +01:00
$WAIT_IF_LOCKED = 1 ;
define ( 'USERS_FOLDER' , 'users/' );
2023-02-07 18:14:49 +01:00
// Delete users outputs of previous `websocket.php` execution.
// We skip `.`, `..` and `.gitignore`.
2023-02-07 17:25:17 +01:00
foreach ( array_slice ( scandir ( USERS_FOLDER ), 3 ) as $file ) {
unlink ( USERS_FOLDER . $file );
}
function getClientFilePath ( $clientId )
{
return USERS_FOLDER . " $clientId .txt " ;
}
// Current implementation may add latency across users.
class MyProcess implements MessageComponentInterface
2023-01-31 00:57:06 +01:00
{
2023-01-30 22:19:04 +01:00
protected $clients ;
2023-02-07 01:22:26 +01:00
private $loop ;
2023-02-07 17:25:17 +01:00
private $newClientId ;
private $newClientIdSem ;
2023-01-30 22:19:04 +01:00
2023-02-07 01:22:26 +01:00
public function __construct ( LoopInterface $loop )
2023-01-31 00:57:06 +01:00
{
$this -> clients = new \SplObjectStorage ();
2023-02-07 01:22:26 +01:00
$this -> loop = $loop ;
2023-02-07 17:25:17 +01:00
$this -> newClientId = 0 ;
$this -> newClientIdSem = sem_get ( 1 , 1 );
}
private function newClient ()
{
2023-02-07 18:14:49 +01:00
// If `onOpen` and `onMessage` can't be called at the same time, then this semaphore is useless.
2023-02-07 17:25:17 +01:00
if ( sem_acquire ( $this -> newClientIdSem )) {
2023-02-07 18:14:49 +01:00
// Note that we don't re-use ids except on `websockets.php` restart, but as the maximal int in PHP is a very great number we are fine for a while (https://www.php.net/manual/en/reserved.constants.php#constant.php-int-max)
2023-02-07 17:25:17 +01:00
$clientId = $this -> newClientId ++ ;
sem_release ( $this -> newClientIdSem );
return new Client ( $clientId );
} else {
2023-02-07 18:14:49 +01:00
exit ( '`newClient` error' );
2023-02-07 17:25:17 +01:00
}
2023-01-30 22:19:04 +01:00
}
2023-01-31 00:57:06 +01:00
public function onOpen ( ConnectionInterface $conn )
{
2023-02-07 17:25:17 +01:00
$client = $this -> newClient ();
$this -> clients -> attach ( $conn , $client );
2023-01-30 22:19:04 +01:00
}
2023-01-31 00:57:06 +01:00
public function onMessage ( ConnectionInterface $from , $msg )
{
2023-02-22 17:48:24 +01:00
// As we are going to use this argument in a shell command, we escape it.
$msg = escapeshellarg ( $msg );
2023-02-07 17:25:17 +01:00
$client = $this -> clients -> offsetGet ( $from );
2023-02-07 18:14:49 +01:00
// If a previous request was received, we execute the new one with another client for simplicity otherwise with current file deletion approach, we can't tell the worker `search.py` that we don't care about its execution anymore.
2023-02-07 17:25:17 +01:00
if ( $client -> pid !== null ) {
2023-02-07 18:14:49 +01:00
// As `$this->clients->detach` doesn't call `__destruct` for unknown reason, we clean manually the previous request.
2023-02-07 17:25:17 +01:00
$client -> free ( $this -> loop );
$client = $this -> newClient ();
}
$clientId = $client -> id ;
$clientFilePath = getClientFilePath ( $clientId );
2023-02-07 18:14:49 +01:00
// Create the worker output file otherwise it would believe that we don't need this worker anymore.
2023-02-07 17:25:17 +01:00
file_put_contents ( $clientFilePath , '' );
2023-02-07 18:14:49 +01:00
// Start the independent worker.
// Redirecting `stdout` is mandatory otherwise `exec` is blocking.
2023-02-22 17:48:24 +01:00
$client -> pid = exec ( " ./search.py $clientId $msg > /dev/null & echo $ ! " );
2023-02-07 18:14:49 +01:00
// `addTimer` doesn't enable us to use independently `$from->send` multiple times with blocking instructions between.
2023-02-07 17:25:17 +01:00
$client -> timer = $this -> loop -> addPeriodicTimer ( 1 , function () use ( $from , $clientId , $clientFilePath , $client ) {
echo " Checking news from $clientId\n " ;
2023-02-07 18:14:49 +01:00
// If the worker output file doesn't exist anymore, then it means that the worker have finished its work and acknowledged that `websocket.php` completely read its output.
2023-02-07 17:25:17 +01:00
if ( file_exists ( $clientFilePath )) {
2023-02-07 18:14:49 +01:00
// `flock` requires `r`eading permission and we need `w`riting one due to `ftruncate` usage.
2023-02-07 17:25:17 +01:00
$fp = fopen ( $clientFilePath , " r+ " );
$read = null ;
if ( flock ( $fp , LOCK_EX , $WAIT_IF_LOCKED )) { // acquire an exclusive lock
2023-02-07 18:14:49 +01:00
// We assume that the temporary output is less than 1 MB long.
2023-02-07 17:25:17 +01:00
$read = fread ( $fp , 1_000_000 );
ftruncate ( $fp , 0 ); // truncate file
fflush ( $fp ); // flush output before releasing the lock
flock ( $fp , LOCK_UN ); // release the lock
} else {
2023-02-07 18:14:49 +01:00
// We `die` instead of `echo`ing to force the developer to investigate the reason.
die ( " Couldn't get the lock! " );
2023-02-07 17:25:17 +01:00
}
fclose ( $fp );
2023-02-07 18:14:49 +01:00
// Assume that empty output doesn't need to me forwarded to the end-user.
2023-02-07 17:25:17 +01:00
if ( $read !== null && $read !== '' ) {
$from -> send ( $read );
}
} else {
2023-02-07 18:14:49 +01:00
// We don't need the periodic timer anymore, as the worker finished its work and acknowledged that `websocket.php` completely read its output.
2023-02-07 17:25:17 +01:00
$this -> loop -> cancelTimer ( $client -> timer );
}
2023-02-07 01:22:26 +01:00
});
2023-01-30 22:19:04 +01:00
}
2023-01-31 00:57:06 +01:00
public function onClose ( ConnectionInterface $conn )
{
2023-02-07 17:25:17 +01:00
$client = $this -> clients -> offsetGet ( $conn );
$clientId = $client -> id ;
$client -> free ( $this -> loop );
echo " $clientId disconnected \n " ;
2023-01-30 22:19:04 +01:00
$this -> clients -> detach ( $conn );
}
2023-01-31 00:57:06 +01:00
public function onError ( ConnectionInterface $conn , \Exception $e )
{
2023-01-30 22:19:04 +01:00
$conn -> close ();
2023-02-07 18:14:49 +01:00
die ( '`onError`' );
2023-01-30 22:19:04 +01:00
}
}
2023-02-07 01:22:26 +01:00
$loop = \React\EventLoop\Factory :: create ();
2023-02-07 18:14:49 +01:00
// Run the server application through the WebSocket protocol on port 4430.
// Note that named arguments come with PHP 8 which isn't current Debian one.
2023-02-07 01:22:26 +01:00
$app = new Ratchet\App ( 'crawler.yt.lemnoslife.com' , 4430 , '127.0.0.1' , $loop );
2023-02-07 17:25:17 +01:00
$app -> route ( '/websocket' , new MyProcess ( $loop ), array ( '*' ));
2023-01-31 00:57:06 +01:00
$app -> run ();