diff --git a/website/index.php b/website/index.php
index acb3d19..15bbee9 100644
--- a/website/index.php
+++ b/website/index.php
@@ -9,11 +9,28 @@
See for more information.
-
-
+
diff --git a/website/search.py b/website/search.py
new file mode 100755
index 0000000..0b0be14
--- /dev/null
+++ b/website/search.py
@@ -0,0 +1,28 @@
+#!/usr/bin/python3
+
+import sys, time, fcntl, os
+
+clientId = sys.argv[1]
+message = sys.argv[2]
+
+clientFilePath = f'users/{clientId}.txt'
+
+def write(s):
+ f = open(clientFilePath, 'w+')
+ try:
+ fcntl.flock(f, fcntl.LOCK_EX)
+ if f.read() == '':
+ f.write(s)
+ else:
+ f.close()
+ time.sleep(1)
+ write(s)
+ except Exception as e:
+ print(e)
+ f.close()
+
+for i in range(10):
+ write(f'{i}: {message}')
+ time.sleep(2)
+
+os.remove(clientFilePath)
diff --git a/website/users/.gitignore b/website/users/.gitignore
new file mode 100644
index 0000000..76bedae
--- /dev/null
+++ b/website/users/.gitignore
@@ -0,0 +1,5 @@
+# Ignore everything in this directory
+*
+# Except this file
+!.gitignore
+
diff --git a/website/websocket.php b/website/websocket.php
index 26bd521..b1dee13 100644
--- a/website/websocket.php
+++ b/website/websocket.php
@@ -8,41 +8,147 @@ use React\EventLoop\Timer\Timer;
// Make sure composer dependencies have been installed
require __DIR__ . '/vendor/autoload.php';
-/**
- * Send any incoming messages to all connected clients
- */
-class MyChat implements MessageComponentInterface
+class Client
+{
+ public $id;
+ public $timer;
+ public $pid;
+
+ public function __construct($id)
+ {
+ $this->id = $id;
+ }
+
+ public function free($loop)
+ {
+ $loop->cancelTimer($this->timer);
+ // Should in theory verify that the pid wasn't re-assigned.
+ posix_kill($this->pid, SIGTERM);
+ $clientFilePath = getClientFilePath($this->id);
+ if (file_exists($clientFilePath)) {
+ $fp = fopen($clientFilePath, "r+");
+ if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock
+ unlink($clientFilePath); // delete file
+ flock($fp, LOCK_UN); // release the lock
+ } else {
+ echo "Couldn't get the lock!";
+ }
+ fclose($fp);
+ }
+ }
+}
+
+$WAIT_IF_LOCKED = 1;
+
+define('USERS_FOLDER', 'users/');
+
+foreach (array_slice(scandir(USERS_FOLDER), 3) as $file) {
+ unlink(USERS_FOLDER . $file);
+}
+
+function getClientFilePath($clientId)
+{
+ return USERS_FOLDER . "$clientId.txt";
+}
+
+// Current implementation may add latency across users.
+class MyProcess implements MessageComponentInterface
{
protected $clients;
private $loop;
+ private $newClientId;
+ private $newClientIdSem;
public function __construct(LoopInterface $loop)
{
$this->clients = new \SplObjectStorage();
$this->loop = $loop;
+ $this->newClientId = 0;
+ $this->newClientIdSem = sem_get(1, 1);
+ }
+
+ private function newClient()
+ {
+ if (sem_acquire($this->newClientIdSem)) {
+ $clientId = $this->newClientId++;
+ sem_release($this->newClientIdSem);
+ return new Client($clientId);
+ } else {
+ exit('`onOpen` error');
+ }
}
public function onOpen(ConnectionInterface $conn)
{
- $this->clients->attach($conn);
+ $client = $this->newClient();
+ $this->clients->attach($conn, $client);
}
public function onMessage(ConnectionInterface $from, $msg)
{
- $from->send($msg);
- $this->loop->addTimer(Timer::MIN_INTERVAL * 10, function() use ($from) {
- sleep(5);
- $from->send('Proceeded!');
+ if (preg_match("/^[a-zA-Z0-9-_ ]+$/", $msg) !== 1) {
+ return;
+ }
+ $client = $this->clients->offsetGet($from);
+ // If a previous request was received, we execute the new one with another client for simplicity.
+ if ($client->pid !== null) {
+ $client->free($this->loop);
+ $client = $this->newClient();
+ }
+ $clientId = $client->id;
+ $clientFilePath = getClientFilePath($clientId);
+ file_put_contents($clientFilePath, '');
+ $client->pid = exec("./search.py $clientId '$msg' > /dev/null & echo $!");
+ $client->timer = $this->loop->addPeriodicTimer(1, function () use ($from, $clientId, $clientFilePath, $client) {
+ echo "Checking news from $clientId\n";
+ if (file_exists($clientFilePath)) {
+ $fp = fopen($clientFilePath, "r+");
+ $read = null;
+ if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock
+ $read = fread($fp, 1_000_000);
+ ftruncate($fp, 0); // truncate file
+ fflush($fp); // flush output before releasing the lock
+ flock($fp, LOCK_UN); // release the lock
+ } else {
+ echo "Couldn't get the lock!";
+ }
+ fclose($fp);
+
+ if ($read !== null && $read !== '') {
+ $from->send($read);
+ }
+ } else {
+ $this->loop->cancelTimer($client->timer);
+ }
});
}
public function onClose(ConnectionInterface $conn)
{
+ $client = $this->clients->offsetGet($conn);
+ $clientId = $client->id;
+ $client->free($this->loop);
+ echo "$clientId disconnected\n";
+ /*$this->loop->cancelTimer($client->timer);
+ // Should in theory verify that the pid wasn't re-assigned.
+ posix_kill($client->pid, SIGTERM);
+ $clientFilePath = getClientFilePath($clientId);
+ if (file_exists($clientFilePath)) {
+ $fp = fopen($clientFilePath, "r+");
+ if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock
+ unlink($clientFilePath); // delete file
+ flock($fp, LOCK_UN); // release the lock
+ } else {
+ echo "Couldn't get the lock!";
+ }
+ fclose($fp);
+ }*/
$this->clients->detach($conn);
}
public function onError(ConnectionInterface $conn, \Exception $e)
{
+ echo '`onError`';
$conn->close();
}
}
@@ -51,5 +157,5 @@ $loop = \React\EventLoop\Factory::create();
// Run the server application through the WebSocket protocol on port 4430
$app = new Ratchet\App('crawler.yt.lemnoslife.com', 4430, '127.0.0.1', $loop);
-$app->route('/websocket', new MyChat($loop), array('*'));
+$app->route('/websocket', new MyProcess($loop), array('*'));
$app->run();