Updated \TamerLib\Objects > WorkerInstance to pass through the parent process arguments to the sub-process

This commit is contained in:
Netkas 2023-06-16 02:31:13 -04:00
parent f85c0e7e5d
commit ee70a733db
No known key found for this signature in database
GPG key ID: 5DAF58535614062B

View file

@ -7,6 +7,7 @@
use InvalidArgumentException; use InvalidArgumentException;
use LogLib\Log; use LogLib\Log;
use RuntimeException; use RuntimeException;
use TamerLib\Exceptions\WorkerFailedException;
use TamerLib\Objects\ServerConfiguration; use TamerLib\Objects\ServerConfiguration;
use TamerLib\Objects\WorkerConfiguration; use TamerLib\Objects\WorkerConfiguration;
use TamerLib\Objects\WorkerInstance; use TamerLib\Objects\WorkerInstance;
@ -47,15 +48,18 @@
$configuration->setDatabase($this->configuration->getDatabase()); $configuration->setDatabase($this->configuration->getDatabase());
$configuration->setChannel($channel); $configuration->setChannel($channel);
// TODO: Pass on database and password
return $configuration; return $configuration;
} }
/** /**
* Spawns a specified number of workers for a given path.
*
* @param string $path * @param string $path
* @param int $count * @param int $count
* @param int $channel * @param int $channel
* @param bool $check
* @return void * @return void
* @throws WorkerFailedException
*/ */
public function spawnWorker(string $path, int $count=8, int $channel=0, bool $check=true): void public function spawnWorker(string $path, int $count=8, int $channel=0, bool $check=true): void
{ {
@ -64,22 +68,57 @@
throw new InvalidArgumentException(sprintf('Path %s does not exist', $path)); throw new InvalidArgumentException(sprintf('Path %s does not exist', $path));
} }
Log::verbose(Utilities::getName(), sprintf('Spawning %s workers for %s', $count, $path));
$spawned_workers = [];
for($i = 0; $i < $count; $i++) for($i = 0; $i < $count; $i++)
{ {
$worker_config = $this->generateWorkerConfiguration($channel); $worker_config = $this->generateWorkerConfiguration($channel);
Log::debug('net.nosial.tamerlib', sprintf('Spawning worker %s (%s)', $worker_config->getWorkerId(), $path));
$worker = new WorkerInstance($worker_config, $path); $worker = new WorkerInstance($worker_config, $path);
$this->workers[$worker_config->getWorkerId()] = $worker; $this->workers[$worker_config->getWorkerId()] = $worker;
$spawned_workers[$worker_config->getWorkerId()] = time();
$worker->start(); $worker->start();
} }
if($check) if($check)
{ {
// TODO: Check if workers are running $this->checkWorkers($spawned_workers);
} }
}
$this->printUpdates(); /**
* Checks if the workers have started.
*
* @param array $workers
* @return void
* @throws WorkerFailedException
*/
private function checkWorkers(array $workers): void
{
while(true)
{
if(count($workers) === 0)
{
return;
}
foreach($workers as $worker_id => $time)
{
if(time() - $time > 3)
{
if($this->workers[$worker_id]->getProcess()?->isRunning() === false)
{
throw new WorkerFailedException(sprintf('Worker %s failed, has not started in %s seconds', $worker_id, 1));
}
Log::debug(Utilities::getName(), sprintf('Worker %s has started in %s seconds', $worker_id, 1));
unset($workers[$worker_id]);
}
}
$this->printUpdates();
}
} }
/** /**
@ -88,6 +127,7 @@
* @param int $count * @param int $count
* @param int $channel * @param int $channel
* @return void * @return void
* @throws WorkerFailedException
*/ */
public function spawnClosure(int $count=8, int $channel=0): void public function spawnClosure(int $count=8, int $channel=0): void
{ {
@ -116,9 +156,15 @@
* *
* @param int $timeout * @param int $timeout
* @return void * @return void
* @throws WorkerFailedException
*/ */
public function monitor(int $timeout=0): void public function monitor(int $timeout=0): void
{ {
if(count($this->workers) === 0)
{
return;
}
$start_time = time(); $start_time = time();
while(true) while(true)
@ -128,15 +174,11 @@
{ {
print($worker->getOutput()); print($worker->getOutput());
if(!$worker->isRunning())
//if(!$worker->isRunning()) {
//{ Log::warning(Utilities::getName(), sprintf('Worker %s is not running, killing', $worker->getConfiguration()->getWorkerId()));
// print($worker->getOutput()); $worker->restart();
// Log::warning('net.nosial.tamerlib', sprintf('Worker %s is not running, killing', $worker->getConfiguration()->getWorkerId())); }
// $worker->stop();
// unset($this->workers[$worker->getConfiguration()->getWorkerId()]);
// $this->spawnWorker($worker->getPath(), 1, $worker->getConfiguration()->getChannel(), false);
//}
} }
if($timeout < 0) if($timeout < 0)
@ -159,7 +201,12 @@
*/ */
public function stopAll(): void public function stopAll(): void
{ {
Log::debug('net.nosial.tamerlib', 'Stopping all workers'); if(count($this->workers) === 0)
{
return;
}
Log::verbose(Utilities::getName(), 'Stopping all workers');
foreach($this->workers as $worker_id => $worker) foreach($this->workers as $worker_id => $worker)
{ {
$worker->stop(); $worker->stop();