id = $id; } public function free($loop) { $loop->cancelTimer($this->timer); // Should in theory verify that the pid wasn't re-assigned. posix_kill($this->pid, SIGTERM); $clientFilePath = getClientFilePath($this->id); if (file_exists($clientFilePath)) { $fp = fopen($clientFilePath, "r+"); if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock unlink($clientFilePath); // delete file flock($fp, LOCK_UN); // release the lock } else { echo "Couldn't get the lock!"; } fclose($fp); } } } $WAIT_IF_LOCKED = 1; define('USERS_FOLDER', 'users/'); foreach (array_slice(scandir(USERS_FOLDER), 3) as $file) { unlink(USERS_FOLDER . $file); } function getClientFilePath($clientId) { return USERS_FOLDER . "$clientId.txt"; } // Current implementation may add latency across users. class MyProcess implements MessageComponentInterface { protected $clients; private $loop; private $newClientId; private $newClientIdSem; public function __construct(LoopInterface $loop) { $this->clients = new \SplObjectStorage(); $this->loop = $loop; $this->newClientId = 0; $this->newClientIdSem = sem_get(1, 1); } private function newClient() { if (sem_acquire($this->newClientIdSem)) { $clientId = $this->newClientId++; sem_release($this->newClientIdSem); return new Client($clientId); } else { exit('`onOpen` error'); } } public function onOpen(ConnectionInterface $conn) { $client = $this->newClient(); $this->clients->attach($conn, $client); } public function onMessage(ConnectionInterface $from, $msg) { if (preg_match("/^[a-zA-Z0-9-_ ]+$/", $msg) !== 1) { return; } $client = $this->clients->offsetGet($from); // If a previous request was received, we execute the new one with another client for simplicity. if ($client->pid !== null) { $client->free($this->loop); $client = $this->newClient(); } $clientId = $client->id; $clientFilePath = getClientFilePath($clientId); file_put_contents($clientFilePath, ''); $client->pid = exec("./search.py $clientId '$msg' > /dev/null & echo $!"); $client->timer = $this->loop->addPeriodicTimer(1, function () use ($from, $clientId, $clientFilePath, $client) { echo "Checking news from $clientId\n"; if (file_exists($clientFilePath)) { $fp = fopen($clientFilePath, "r+"); $read = null; if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock $read = fread($fp, 1_000_000); ftruncate($fp, 0); // truncate file fflush($fp); // flush output before releasing the lock flock($fp, LOCK_UN); // release the lock } else { echo "Couldn't get the lock!"; } fclose($fp); if ($read !== null && $read !== '') { $from->send($read); } } else { $this->loop->cancelTimer($client->timer); } }); } public function onClose(ConnectionInterface $conn) { $client = $this->clients->offsetGet($conn); $clientId = $client->id; $client->free($this->loop); echo "$clientId disconnected\n"; /*$this->loop->cancelTimer($client->timer); // Should in theory verify that the pid wasn't re-assigned. posix_kill($client->pid, SIGTERM); $clientFilePath = getClientFilePath($clientId); if (file_exists($clientFilePath)) { $fp = fopen($clientFilePath, "r+"); if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock unlink($clientFilePath); // delete file flock($fp, LOCK_UN); // release the lock } else { echo "Couldn't get the lock!"; } fclose($fp); }*/ $this->clients->detach($conn); } public function onError(ConnectionInterface $conn, \Exception $e) { echo '`onError`'; $conn->close(); } } $loop = \React\EventLoop\Factory::create(); // Run the server application through the WebSocket protocol on port 4430 $app = new Ratchet\App('crawler.yt.lemnoslife.com', 4430, '127.0.0.1', $loop); $app->route('/websocket', new MyProcess($loop), array('*')); $app->run();