From ee70a733dbca0ba8de3ceab2187c169d0d0193b0 Mon Sep 17 00:00:00 2001 From: Netkas Date: Fri, 16 Jun 2023 02:31:13 -0400 Subject: [PATCH] Updated \TamerLib\Objects > WorkerInstance to pass through the parent process arguments to the sub-process --- src/TamerLib/Classes/WorkerSupervisor.php | 77 ++++++++++++++++++----- 1 file changed, 62 insertions(+), 15 deletions(-) diff --git a/src/TamerLib/Classes/WorkerSupervisor.php b/src/TamerLib/Classes/WorkerSupervisor.php index dd14b47..f98afe9 100644 --- a/src/TamerLib/Classes/WorkerSupervisor.php +++ b/src/TamerLib/Classes/WorkerSupervisor.php @@ -7,6 +7,7 @@ use InvalidArgumentException; use LogLib\Log; use RuntimeException; + use TamerLib\Exceptions\WorkerFailedException; use TamerLib\Objects\ServerConfiguration; use TamerLib\Objects\WorkerConfiguration; use TamerLib\Objects\WorkerInstance; @@ -47,15 +48,18 @@ $configuration->setDatabase($this->configuration->getDatabase()); $configuration->setChannel($channel); - // TODO: Pass on database and password return $configuration; } /** + * Spawns a specified number of workers for a given path. + * * @param string $path * @param int $count * @param int $channel + * @param bool $check * @return void + * @throws WorkerFailedException */ public function spawnWorker(string $path, int $count=8, int $channel=0, bool $check=true): void { @@ -64,22 +68,57 @@ throw new InvalidArgumentException(sprintf('Path %s does not exist', $path)); } + Log::verbose(Utilities::getName(), sprintf('Spawning %s workers for %s', $count, $path)); + + $spawned_workers = []; + for($i = 0; $i < $count; $i++) { $worker_config = $this->generateWorkerConfiguration($channel); - Log::debug('net.nosial.tamerlib', sprintf('Spawning worker %s (%s)', $worker_config->getWorkerId(), $path)); - $worker = new WorkerInstance($worker_config, $path); $this->workers[$worker_config->getWorkerId()] = $worker; + $spawned_workers[$worker_config->getWorkerId()] = time(); $worker->start(); } if($check) { - // TODO: Check if workers are running + $this->checkWorkers($spawned_workers); } + } - $this->printUpdates(); + /** + * Checks if the workers have started. + * + * @param array $workers + * @return void + * @throws WorkerFailedException + */ + private function checkWorkers(array $workers): void + { + while(true) + { + if(count($workers) === 0) + { + return; + } + + foreach($workers as $worker_id => $time) + { + if(time() - $time > 3) + { + if($this->workers[$worker_id]->getProcess()?->isRunning() === false) + { + throw new WorkerFailedException(sprintf('Worker %s failed, has not started in %s seconds', $worker_id, 1)); + } + + Log::debug(Utilities::getName(), sprintf('Worker %s has started in %s seconds', $worker_id, 1)); + unset($workers[$worker_id]); + } + } + + $this->printUpdates(); + } } /** @@ -88,6 +127,7 @@ * @param int $count * @param int $channel * @return void + * @throws WorkerFailedException */ public function spawnClosure(int $count=8, int $channel=0): void { @@ -116,9 +156,15 @@ * * @param int $timeout * @return void + * @throws WorkerFailedException */ public function monitor(int $timeout=0): void { + if(count($this->workers) === 0) + { + return; + } + $start_time = time(); while(true) @@ -128,15 +174,11 @@ { print($worker->getOutput()); - - //if(!$worker->isRunning()) - //{ - // print($worker->getOutput()); - // Log::warning('net.nosial.tamerlib', sprintf('Worker %s is not running, killing', $worker->getConfiguration()->getWorkerId())); - // $worker->stop(); - // unset($this->workers[$worker->getConfiguration()->getWorkerId()]); - // $this->spawnWorker($worker->getPath(), 1, $worker->getConfiguration()->getChannel(), false); - //} + if(!$worker->isRunning()) + { + Log::warning(Utilities::getName(), sprintf('Worker %s is not running, killing', $worker->getConfiguration()->getWorkerId())); + $worker->restart(); + } } if($timeout < 0) @@ -159,7 +201,12 @@ */ public function stopAll(): void { - Log::debug('net.nosial.tamerlib', 'Stopping all workers'); + if(count($this->workers) === 0) + { + return; + } + + Log::verbose(Utilities::getName(), 'Stopping all workers'); foreach($this->workers as $worker_id => $worker) { $worker->stop();