Comment WebSocket mechanism to work with an arbitrary number of independent send
This commit is contained in:
parent
03c2566a20
commit
82e597f205
@ -10,7 +10,7 @@
|
|||||||
See <?php echoUrl('https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine'); ?> for more information.<br/>
|
See <?php echoUrl('https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions_search_engine'); ?> for more information.<br/>
|
||||||
|
|
||||||
<form id="form">
|
<form id="form">
|
||||||
<input type="text" autofocus id="search" pattern="[A-Za-z0-9-_ ]+" placeholder="Your alphanumeric search"></input>
|
<input type="text" autofocus id="search" pattern="[A-Za-z0-9-_ ]+" placeholder="Your [A-Za-z0-9-_ ]+ search"></input>
|
||||||
<input type="submit" value="Search">
|
<input type="submit" value="Search">
|
||||||
</form>
|
</form>
|
||||||
|
|
||||||
@ -19,14 +19,17 @@ See <?php echoUrl('https://gitea.lemnoslife.com/Benjamin_Loison/YouTube_captions
|
|||||||
var conn;
|
var conn;
|
||||||
|
|
||||||
function search(event) {
|
function search(event) {
|
||||||
|
// We don't want to refresh the webpage which is the default behavior.
|
||||||
event.preventDefault();
|
event.preventDefault();
|
||||||
const query = document.getElementById('search').value;
|
const query = document.getElementById('search').value;
|
||||||
if (firstRun) {
|
if (firstRun) {
|
||||||
firstRun = false;
|
firstRun = false;
|
||||||
conn = new WebSocket('wss://crawler.yt.lemnoslife.com/websocket');
|
conn = new WebSocket('wss://crawler.yt.lemnoslife.com/websocket');
|
||||||
conn.onmessage = function(e) { console.log(e.data); };
|
conn.onmessage = function(e) { console.log(e.data); };
|
||||||
|
// We can't directly proceed with `conn.send`, as the connection may not be already established.
|
||||||
conn.onopen = function(e) { conn.send(query); };
|
conn.onopen = function(e) { conn.send(query); };
|
||||||
} else {
|
} else {
|
||||||
|
// We assume at this point that the connection is established.
|
||||||
conn.send(query);
|
conn.send(query);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,18 +11,27 @@ def write(s):
|
|||||||
f = open(clientFilePath, 'w+')
|
f = open(clientFilePath, 'w+')
|
||||||
try:
|
try:
|
||||||
fcntl.flock(f, fcntl.LOCK_EX)
|
fcntl.flock(f, fcntl.LOCK_EX)
|
||||||
if f.read() == '':
|
# If the output file is empty, then it means that `websocket.php` read it. Anyway we don't wait it and we append what we want to output.
|
||||||
f.write(s)
|
read = f.read()
|
||||||
else:
|
f.write(f"{read}\n{s}")
|
||||||
f.close()
|
|
||||||
time.sleep(1)
|
|
||||||
write(s)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
sys.exit(e)
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
write(f'{i}: {message}')
|
write(f'{i}: {message}')
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
|
|
||||||
os.remove(clientFilePath)
|
f = open(clientFilePath, 'r')
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
fcntl.flock(f, fcntl.LOCK_EX)
|
||||||
|
if f.read() == '':
|
||||||
|
os.remove(clientFilePath)
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
time.sleep(1)
|
||||||
|
except Exception as e:
|
||||||
|
sys.exit(e)
|
||||||
|
|
||||||
|
f.close()
|
||||||
|
@ -19,6 +19,7 @@ class Client
|
|||||||
$this->id = $id;
|
$this->id = $id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// `__destruct` can't take arguments.
|
||||||
public function free($loop)
|
public function free($loop)
|
||||||
{
|
{
|
||||||
$loop->cancelTimer($this->timer);
|
$loop->cancelTimer($this->timer);
|
||||||
@ -38,10 +39,13 @@ class Client
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Need to be passed as a reference to `flock`.
|
||||||
$WAIT_IF_LOCKED = 1;
|
$WAIT_IF_LOCKED = 1;
|
||||||
|
|
||||||
define('USERS_FOLDER', 'users/');
|
define('USERS_FOLDER', 'users/');
|
||||||
|
|
||||||
|
// Delete users outputs of previous `websocket.php` execution.
|
||||||
|
// We skip `.`, `..` and `.gitignore`.
|
||||||
foreach (array_slice(scandir(USERS_FOLDER), 3) as $file) {
|
foreach (array_slice(scandir(USERS_FOLDER), 3) as $file) {
|
||||||
unlink(USERS_FOLDER . $file);
|
unlink(USERS_FOLDER . $file);
|
||||||
}
|
}
|
||||||
@ -69,12 +73,14 @@ class MyProcess implements MessageComponentInterface
|
|||||||
|
|
||||||
private function newClient()
|
private function newClient()
|
||||||
{
|
{
|
||||||
|
// If `onOpen` and `onMessage` can't be called at the same time, then this semaphore is useless.
|
||||||
if (sem_acquire($this->newClientIdSem)) {
|
if (sem_acquire($this->newClientIdSem)) {
|
||||||
|
// Note that we don't re-use ids except on `websockets.php` restart, but as the maximal int in PHP is a very great number we are fine for a while (https://www.php.net/manual/en/reserved.constants.php#constant.php-int-max)
|
||||||
$clientId = $this->newClientId++;
|
$clientId = $this->newClientId++;
|
||||||
sem_release($this->newClientIdSem);
|
sem_release($this->newClientIdSem);
|
||||||
return new Client($clientId);
|
return new Client($clientId);
|
||||||
} else {
|
} else {
|
||||||
exit('`onOpen` error');
|
exit('`newClient` error');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,38 +92,50 @@ class MyProcess implements MessageComponentInterface
|
|||||||
|
|
||||||
public function onMessage(ConnectionInterface $from, $msg)
|
public function onMessage(ConnectionInterface $from, $msg)
|
||||||
{
|
{
|
||||||
|
// As we are going to use this argument in a shell command, we verify a limited set of characters that are safe once quoted.
|
||||||
if (preg_match("/^[a-zA-Z0-9-_ ]+$/", $msg) !== 1) {
|
if (preg_match("/^[a-zA-Z0-9-_ ]+$/", $msg) !== 1) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
$client = $this->clients->offsetGet($from);
|
$client = $this->clients->offsetGet($from);
|
||||||
// If a previous request was received, we execute the new one with another client for simplicity.
|
// If a previous request was received, we execute the new one with another client for simplicity otherwise with current file deletion approach, we can't tell the worker `search.py` that we don't care about its execution anymore.
|
||||||
if ($client->pid !== null) {
|
if ($client->pid !== null) {
|
||||||
|
// As `$this->clients->detach` doesn't call `__destruct` for unknown reason, we clean manually the previous request.
|
||||||
$client->free($this->loop);
|
$client->free($this->loop);
|
||||||
$client = $this->newClient();
|
$client = $this->newClient();
|
||||||
}
|
}
|
||||||
$clientId = $client->id;
|
$clientId = $client->id;
|
||||||
$clientFilePath = getClientFilePath($clientId);
|
$clientFilePath = getClientFilePath($clientId);
|
||||||
|
// Create the worker output file otherwise it would believe that we don't need this worker anymore.
|
||||||
file_put_contents($clientFilePath, '');
|
file_put_contents($clientFilePath, '');
|
||||||
|
// Start the independent worker.
|
||||||
|
// Redirecting `stdout` is mandatory otherwise `exec` is blocking.
|
||||||
$client->pid = exec("./search.py $clientId '$msg' > /dev/null & echo $!");
|
$client->pid = exec("./search.py $clientId '$msg' > /dev/null & echo $!");
|
||||||
|
// `addTimer` doesn't enable us to use independently `$from->send` multiple times with blocking instructions between.
|
||||||
$client->timer = $this->loop->addPeriodicTimer(1, function () use ($from, $clientId, $clientFilePath, $client) {
|
$client->timer = $this->loop->addPeriodicTimer(1, function () use ($from, $clientId, $clientFilePath, $client) {
|
||||||
echo "Checking news from $clientId\n";
|
echo "Checking news from $clientId\n";
|
||||||
|
// If the worker output file doesn't exist anymore, then it means that the worker have finished its work and acknowledged that `websocket.php` completely read its output.
|
||||||
if (file_exists($clientFilePath)) {
|
if (file_exists($clientFilePath)) {
|
||||||
|
// `flock` requires `r`eading permission and we need `w`riting one due to `ftruncate` usage.
|
||||||
$fp = fopen($clientFilePath, "r+");
|
$fp = fopen($clientFilePath, "r+");
|
||||||
$read = null;
|
$read = null;
|
||||||
if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock
|
if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock
|
||||||
|
// We assume that the temporary output is less than 1 MB long.
|
||||||
$read = fread($fp, 1_000_000);
|
$read = fread($fp, 1_000_000);
|
||||||
ftruncate($fp, 0); // truncate file
|
ftruncate($fp, 0); // truncate file
|
||||||
fflush($fp); // flush output before releasing the lock
|
fflush($fp); // flush output before releasing the lock
|
||||||
flock($fp, LOCK_UN); // release the lock
|
flock($fp, LOCK_UN); // release the lock
|
||||||
} else {
|
} else {
|
||||||
echo "Couldn't get the lock!";
|
// We `die` instead of `echo`ing to force the developer to investigate the reason.
|
||||||
|
die("Couldn't get the lock!");
|
||||||
}
|
}
|
||||||
fclose($fp);
|
fclose($fp);
|
||||||
|
|
||||||
|
// Assume that empty output doesn't need to me forwarded to the end-user.
|
||||||
if ($read !== null && $read !== '') {
|
if ($read !== null && $read !== '') {
|
||||||
$from->send($read);
|
$from->send($read);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// We don't need the periodic timer anymore, as the worker finished its work and acknowledged that `websocket.php` completely read its output.
|
||||||
$this->loop->cancelTimer($client->timer);
|
$this->loop->cancelTimer($client->timer);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -129,33 +147,20 @@ class MyProcess implements MessageComponentInterface
|
|||||||
$clientId = $client->id;
|
$clientId = $client->id;
|
||||||
$client->free($this->loop);
|
$client->free($this->loop);
|
||||||
echo "$clientId disconnected\n";
|
echo "$clientId disconnected\n";
|
||||||
/*$this->loop->cancelTimer($client->timer);
|
|
||||||
// Should in theory verify that the pid wasn't re-assigned.
|
|
||||||
posix_kill($client->pid, SIGTERM);
|
|
||||||
$clientFilePath = getClientFilePath($clientId);
|
|
||||||
if (file_exists($clientFilePath)) {
|
|
||||||
$fp = fopen($clientFilePath, "r+");
|
|
||||||
if (flock($fp, LOCK_EX, $WAIT_IF_LOCKED)) { // acquire an exclusive lock
|
|
||||||
unlink($clientFilePath); // delete file
|
|
||||||
flock($fp, LOCK_UN); // release the lock
|
|
||||||
} else {
|
|
||||||
echo "Couldn't get the lock!";
|
|
||||||
}
|
|
||||||
fclose($fp);
|
|
||||||
}*/
|
|
||||||
$this->clients->detach($conn);
|
$this->clients->detach($conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function onError(ConnectionInterface $conn, \Exception $e)
|
public function onError(ConnectionInterface $conn, \Exception $e)
|
||||||
{
|
{
|
||||||
echo '`onError`';
|
|
||||||
$conn->close();
|
$conn->close();
|
||||||
|
die('`onError`');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
$loop = \React\EventLoop\Factory::create();
|
$loop = \React\EventLoop\Factory::create();
|
||||||
|
|
||||||
// Run the server application through the WebSocket protocol on port 4430
|
// Run the server application through the WebSocket protocol on port 4430.
|
||||||
|
// Note that named arguments come with PHP 8 which isn't current Debian one.
|
||||||
$app = new Ratchet\App('crawler.yt.lemnoslife.com', 4430, '127.0.0.1', $loop);
|
$app = new Ratchet\App('crawler.yt.lemnoslife.com', 4430, '127.0.0.1', $loop);
|
||||||
$app->route('/websocket', new MyProcess($loop), array('*'));
|
$app->route('/websocket', new MyProcess($loop), array('*'));
|
||||||
$app->run();
|
$app->run();
|
||||||
|
Loading…
Reference in New Issue
Block a user