From 1b8d2fb40a14dbce7e57dbc835b1201efde79a8c Mon Sep 17 00:00:00 2001 From: Netkas Date: Sun, 5 Feb 2023 17:24:22 -0500 Subject: [PATCH] Implemented supervisors, refactored some stuff, implemented closures, updated examples and added dependency for Symfony\Process --- project.json | 6 + src/Tamer/Classes/Functions.php | 61 ++++++ src/Tamer/Classes/Supervisor.php | 187 ++++++++++++++++++ .../UnsupervisedWorkerException.php | 19 ++ src/Tamer/Objects/WorkerInstance.php | 186 +++++++++++++++++ src/Tamer/Objects/closure | 16 ++ src/Tamer/Protocols/Gearman/Client.php | 10 +- src/Tamer/Protocols/Gearman/Worker.php | 4 +- src/Tamer/Tamer.php | 137 ++++++++++--- tests/gearman_client.php | 26 --- tests/gearman_closure.php | 15 -- tests/gearman_worker.php | 27 --- tests/no_tamer.php | 30 +++ tests/rabbitmq_client.php | 20 -- tests/rabbitmq_worker.php | 27 --- tests/tamer.php | 70 +++++++ tests/tamer_client.php | 52 ++--- tests/tamer_worker.php | 4 +- 18 files changed, 707 insertions(+), 190 deletions(-) create mode 100644 src/Tamer/Classes/Supervisor.php create mode 100644 src/Tamer/Exceptions/UnsupervisedWorkerException.php create mode 100644 src/Tamer/Objects/WorkerInstance.php create mode 100644 src/Tamer/Objects/closure delete mode 100644 tests/gearman_client.php delete mode 100644 tests/gearman_closure.php delete mode 100644 tests/gearman_worker.php create mode 100644 tests/no_tamer.php delete mode 100644 tests/rabbitmq_client.php delete mode 100644 tests/rabbitmq_worker.php create mode 100644 tests/tamer.php diff --git a/project.json b/project.json index b10434a..a731155 100644 --- a/project.json +++ b/project.json @@ -52,6 +52,12 @@ "version": "latest", "source_type": "remote", "source": "php-amqplib/php-amqplib=latest@composer" + }, + { + "name": "com.symfony.process", + "version": "latest", + "source_type": "remote", + "source": "symfony/process=latest@composer" } ], "configurations": [ diff --git a/src/Tamer/Classes/Functions.php b/src/Tamer/Classes/Functions.php index 399baaa..4b7bd33 100644 --- a/src/Tamer/Classes/Functions.php +++ b/src/Tamer/Classes/Functions.php @@ -2,14 +2,30 @@ namespace Tamer\Classes; + use Exception; use InvalidArgumentException; use OptsLib\Parse; + use Symfony\Component\Process\PhpExecutableFinder; use Tamer\Abstracts\ProtocolType; use Tamer\Interfaces\ClientProtocolInterface; use Tamer\Interfaces\WorkerProtocolInterface; class Functions { + /** + * A cache of the worker variables + * + * @var array|null + */ + private static $worker_variables; + + /** + * A cache of the php binary path + * + * @var string|null + */ + private static $php_bin; + /** * Attempts to get the worker id from the command line arguments or the environment variable TAMER_WORKER_ID * If neither are set, returns null. @@ -66,4 +82,49 @@ default => throw new InvalidArgumentException('Invalid protocol type'), }; } + + /** + * Returns the worker variables from the environment variables + * + * @return array + */ + public static function getWorkerVariables(): array + { + if(self::$worker_variables == null) + { + self::$worker_variables = [ + 'TAMER_ENABLED' => getenv('TAMER_ENABLED') === 'true', + 'TAMER_PROTOCOL' => getenv('TAMER_PROTOCOL'), + 'TAMER_SERVERS' => getenv('TAMER_SERVERS'), + 'TAMER_USERNAME' => getenv('TAMER_USERNAME'), + 'TAMER_PASSWORD' => getenv('TAMER_PASSWORD'), + 'TAMER_INSTANCE_ID' => getenv('TAMER_INSTANCE_ID'), + ]; + + if(self::$worker_variables['TAMER_SERVERS'] !== false) + self::$worker_variables['TAMER_SERVERS'] = explode(',', self::$worker_variables['TAMER_SERVERS']); + } + + return self::$worker_variables; + } + + /** + * Returns the path to the php binary + * + * @return string + * @throws Exception + */ + public static function findPhpBin(): string + { + if(self::$php_bin !== null) + return self::$php_bin; + + $php_finder = new PhpExecutableFinder(); + $php_bin = $php_finder->find(); + if($php_bin === false) + throw new Exception('Unable to find the php binary'); + + self::$php_bin = $php_bin; + return $php_bin; + } } \ No newline at end of file diff --git a/src/Tamer/Classes/Supervisor.php b/src/Tamer/Classes/Supervisor.php new file mode 100644 index 0000000..a93b867 --- /dev/null +++ b/src/Tamer/Classes/Supervisor.php @@ -0,0 +1,187 @@ +workers = []; + $this->protocol = $protocol; + $this->servers = $servers; + $this->username = $username; + $this->password = $password; + } + + /** + * Adds a worker to the supervisor instance + * + * @param string $target + * @param int $instances + * @return void + * @throws Exception + */ + public function addWorker(string $target, int $instances): void + { + for ($i = 0; $i < $instances; $i++) + { + $this->workers[] = new WorkerInstance($target, $this->protocol, $this->servers, $this->username, $this->password); + } + } + + /** + * Starts all the workers + * + * @return void + * @throws Exception + */ + public function start(): void + { + /** @var WorkerInstance $worker */ + foreach ($this->workers as $worker) + { + $worker->start(); + } + + // Ensure that all the workers are running + foreach($this->workers as $worker) + { + if (!$worker->isRunning()) + { + throw new Exception("Worker {$worker->getId()} is not running"); + } + + while(true) + { + switch($worker->getProcess()->getStatus()) + { + case Process::STATUS_STARTED: + Log::debug('net.nosial.tamerlib', "worker {$worker->getId()} is running"); + break 2; + + case Process::STATUS_TERMINATED: + throw new Exception("Worker {$worker->getId()} has terminated"); + + default: + echo "Worker {$worker->getId()} is {$worker->getProcess()->getStatus()}" . PHP_EOL; + } + } + } + } + + /** + * Stops all the workers + * + * @return void + * @throws Exception + */ + public function stop(): void + { + /** @var WorkerInstance $worker */ + foreach ($this->workers as $worker) + { + $worker->stop(); + } + } + + /** + * Restarts all the workers + * + * @return void + * @throws Exception + */ + public function restart(): void + { + /** @var WorkerInstance $worker */ + foreach ($this->workers as $worker) + { + $worker->stop(); + $worker->start(); + } + } + + /** + * Monitors all the workers and restarts them if they are not running + * + * @param bool $auto_restart + * @return void + * @throws Exception + */ + public function monitor(bool $auto_restart = true): void + { + while (true) + { + /** @var WorkerInstance $worker */ + foreach ($this->workers as $worker) + { + if (!$worker->isRunning()) + { + if ($auto_restart) + { + $worker->start(); + } + else + { + throw new Exception("Worker {$worker->getId()} is not running"); + } + } + } + + sleep(1); + } + } + + /** + * @throws Exception + */ + public function __destruct() + { + $this->stop(); + } + + } \ No newline at end of file diff --git a/src/Tamer/Exceptions/UnsupervisedWorkerException.php b/src/Tamer/Exceptions/UnsupervisedWorkerException.php new file mode 100644 index 0000000..dbd472c --- /dev/null +++ b/src/Tamer/Exceptions/UnsupervisedWorkerException.php @@ -0,0 +1,19 @@ +id = uniqid(); + $this->target = $target; + $this->protocol = $protocol; + $this->servers = $servers; + $this->username = $username; + $this->password = $password; + $this->process = null; + + if($target !== 'closure' && file_exists($target) === false) + { + throw new Exception('The target file does not exist'); + } + } + + /** + * Returns the worker instance id + * + * @return string + */ + public function getId(): string + { + return $this->id; + } + + /** + * Executes the worker instance in a separate process + * + * @return void + * @throws Exception + */ + public function start(): void + { + $target = $this->target; + if($target == 'closure') + { + $target = __DIR__ . DIRECTORY_SEPARATOR . 'closure'; + } + + $argv = $_SERVER['argv']; + array_shift($argv); + + $this->process = new Process(array_merge([Functions::findPhpBin(), $target], $argv)); + $this->process->setEnv([ + 'TAMER_ENABLED' => 'true', + 'TAMER_PROTOCOL' => $this->protocol, + 'TAMER_SERVERS' => implode(',', $this->servers), + 'TAMER_USERNAME' => $this->username, + 'TAMER_PASSWORD' => $this->password, + 'TAMER_INSTANCE_ID' => $this->id + ]); + + + Log::debug('net.nosial.tamerlib', sprintf('starting worker %s', $this->id)); + + // Callback for process output + $this->process->start(function ($type, $buffer) + { + // Add newline if it's missing + if(substr($buffer, -1) !== PHP_EOL) + { + $buffer .= PHP_EOL; + } + + print($buffer); + }); + } + + /** + * Stops the worker instance + * + * @return void + */ + public function stop(): void + { + if($this->process !== null) + { + Log::debug('net.nosial.tamerlib', sprintf('Stopping worker %s', $this->id)); + $this->process->stop(); + } + } + + /** + * Returns whether the worker instance is running + * + * @return bool + */ + public function isRunning(): bool + { + if($this->process !== null) + { + return $this->process->isRunning(); + } + + return false; + } + + /** + * @return Process|null + */ + public function getProcess(): ?Process + { + return $this->process; + } + + /** + * Destructor + */ + public function __destruct() + { + $this->stop(); + } + } \ No newline at end of file diff --git a/src/Tamer/Objects/closure b/src/Tamer/Objects/closure new file mode 100644 index 0000000..052f517 --- /dev/null +++ b/src/Tamer/Objects/closure @@ -0,0 +1,16 @@ +getMessage(), $e); + exit(1); + } diff --git a/src/Tamer/Protocols/Gearman/Client.php b/src/Tamer/Protocols/Gearman/Client.php index a89b817..1347c36 100644 --- a/src/Tamer/Protocols/Gearman/Client.php +++ b/src/Tamer/Protocols/Gearman/Client.php @@ -112,7 +112,7 @@ foreach($servers as $server) { $server = explode(':', $server); - $this->addServer($server[0], $server[1]); + $this->addServer($server[0], (int)$server[1]); } } @@ -359,9 +359,7 @@ $this->preformAutoreconf(); if(!$this->client->runTasks()) - { return false; - } return true; } @@ -456,15 +454,11 @@ { try { - $this->run(); + $this->disconnect(); } catch(Exception $e) { unset($e); } - finally - { - $this->disconnect(); - } } } \ No newline at end of file diff --git a/src/Tamer/Protocols/Gearman/Worker.php b/src/Tamer/Protocols/Gearman/Worker.php index 8404b02..524ae48 100644 --- a/src/Tamer/Protocols/Gearman/Worker.php +++ b/src/Tamer/Protocols/Gearman/Worker.php @@ -102,7 +102,7 @@ foreach($servers as $server) { $server = explode(':', $server); - $this->addServer($server[0], $server[1]); + $this->addServer($server[0], (int)$server[1]); } } @@ -120,8 +120,6 @@ $this->worker = new GearmanWorker(); $this->worker->addOptions(GEARMAN_WORKER_GRAB_UNIQ); - Log::debug('net.nosial.tamerlib', 'connecting to gearman server(s)'); - foreach($this->defined_servers as $host => $ports) { foreach($ports as $port) diff --git a/src/Tamer/Tamer.php b/src/Tamer/Tamer.php index 72a6010..36d3e4b 100644 --- a/src/Tamer/Tamer.php +++ b/src/Tamer/Tamer.php @@ -5,11 +5,14 @@ namespace Tamer; use Closure; + use Exception; use InvalidArgumentException; use Tamer\Abstracts\Mode; use Tamer\Classes\Functions; + use Tamer\Classes\Supervisor; use Tamer\Classes\Validate; use Tamer\Exceptions\ConnectionException; + use Tamer\Exceptions\UnsupervisedWorkerException; use Tamer\Interfaces\ClientProtocolInterface; use Tamer\Interfaces\WorkerProtocolInterface; use Tamer\Objects\Task; @@ -53,17 +56,23 @@ private static $connected; /** - * Connects to a server using the specified protocol and mode (client or worker) + * The supervisor that is supervising the workers + * + * @var Supervisor + */ + private static $supervisor; + + /** + * Initializes Tamer as a client and connects to the server * * @param string $protocol - * @param string $mode * @param array $servers * @param string|null $username * @param string|null $password * @return void * @throws ConnectionException */ - public static function connect(string $protocol, string $mode, array $servers, ?string $username=null, ?string $password=null): void + public static function init(string $protocol, array $servers, ?string $username=null, ?string $password=null): void { if(self::$connected) { @@ -75,31 +84,39 @@ throw new InvalidArgumentException(sprintf('Invalid protocol type: %s', $protocol)); } - if (!Validate::mode($mode)) - { - throw new InvalidArgumentException(sprintf('Invalid mode: %s', $mode)); - } - self::$protocol = $protocol; - self::$mode = $mode; + self::$mode = Mode::Client; + self::$client = Functions::createClient($protocol, $username, $password); + self::$client->addServers($servers); + self::$client->connect(); + self::$supervisor = new Supervisor($protocol, $servers, $username, $password); + self::$connected = true; + } - if (self::$mode === Mode::Client) + /** + * Initializes Tamer as a worker client and connects to the server + * + * @return void + * @throws ConnectionException + * @throws UnsupervisedWorkerException + */ + public static function initWorker(): void + { + if(self::$connected) { - self::$client = Functions::createClient($protocol, $username, $password); - self::$client->addServers($servers); - self::$client->connect(); - } - elseif(self::$mode === Mode::Worker) - { - self::$worker = Functions::createWorker($protocol, $username, $password); - self::$worker->addServers($servers); - self::$worker->connect(); - } - else - { - throw new InvalidArgumentException(sprintf('Invalid mode: %s', $mode)); + throw new ConnectionException('Tamer is already connected to the server'); } + if(!Functions::getWorkerVariables()['TAMER_ENABLED']) + { + throw new UnsupervisedWorkerException('Tamer is not enabled for this worker'); + } + + self::$protocol = Functions::getWorkerVariables()['TAMER_PROTOCOL']; + self::$mode = Mode::Worker; + self::$worker = Functions::createWorker(self::$protocol); + self::$worker->addServers(Functions::getWorkerVariables()['TAMER_SERVERS']); + self::$worker->connect(); self::$connected = true; } @@ -294,6 +311,80 @@ } } + /** + * Adds a worker to the supervisor + * + * @param string $target + * @param int $instances + * @return void + * @throws Exception + */ + public static function addWorker(string $target, int $instances): void + { + if (self::$mode === Mode::Client) + { + self::$supervisor->addWorker($target, $instances); + } + else + { + throw new InvalidArgumentException('Tamer is not running in client mode'); + } + } + + /** + * Starts all workers + * + * @return void + * @throws Exception + */ + public static function startWorkers(): void + { + if (self::$mode === Mode::Client) + { + self::$supervisor->start(); + } + else + { + throw new InvalidArgumentException('Tamer is not running in client mode'); + } + } + + /** + * Stops all workers + * + * @return void + * @throws Exception + */ + public static function stopWorkers(): void + { + if (self::$mode === Mode::Client) + { + self::$supervisor->stop(); + } + else + { + throw new InvalidArgumentException('Tamer is not running in client mode'); + } + } + + /** + * Restarts all workers + * + * @return void + * @throws Exception + */ + public static function restartWorkers(): void + { + if (self::$mode === Mode::Client) + { + self::$supervisor->restart(); + } + else + { + throw new InvalidArgumentException('Tamer is not running in client mode'); + } + } + /** * @return string */ diff --git a/tests/gearman_client.php b/tests/gearman_client.php deleted file mode 100644 index 8fdee96..0000000 --- a/tests/gearman_client.php +++ /dev/null @@ -1,26 +0,0 @@ -addServer(); - - $client->do(new Task('sleep', '5')); - - - $client->queue(new Task('sleep', '5', function(JobResults $job) { - echo "Task {$job->getId()} completed with data: {$job->getData()} \n"; - })); - - - $client->queue(new Task('sleep', '5', function(JobResults $job) { - echo "Task {$job->getId()} completed with data: {$job->getData()} \n"; - })); - - - $client->run(); \ No newline at end of file diff --git a/tests/gearman_closure.php b/tests/gearman_closure.php deleted file mode 100644 index d0a72eb..0000000 --- a/tests/gearman_closure.php +++ /dev/null @@ -1,15 +0,0 @@ -addServer(); - - $client->doClosure(function () { - require 'ncc'; - import('net.nosial.loglib', 'latest'); - - \LogLib\Log::info('gearman_closure.php', 'closure'); - }); \ No newline at end of file diff --git a/tests/gearman_worker.php b/tests/gearman_worker.php deleted file mode 100644 index e998220..0000000 --- a/tests/gearman_worker.php +++ /dev/null @@ -1,27 +0,0 @@ -addServer(); - - $worker->addFunction('sleep', function($job) { - /** @var Job $job */ - var_dump(get_class($job)); - echo "Task {$job->getId()} started with data: {$job->getData()} \n"; - sleep($job->getData()); - echo "Task {$job->getId()} completed with data: {$job->getData()} \n"; - - return $job->getData(); - }); - - - - while(true) - { - echo "Waiting for job... \n"; - $worker->work(); - } \ No newline at end of file diff --git a/tests/no_tamer.php b/tests/no_tamer.php new file mode 100644 index 0000000..5f120fc --- /dev/null +++ b/tests/no_tamer.php @@ -0,0 +1,30 @@ +addServer('127.0.0.1', 5672); - - // Loop through 10 tasks - - for($i = 0; $i < 500; $i++) - { - $client->do(Task::create('sleep', '5') - ->setPriority(TaskPriority::High) - ); - } diff --git a/tests/rabbitmq_worker.php b/tests/rabbitmq_worker.php deleted file mode 100644 index 5d482c3..0000000 --- a/tests/rabbitmq_worker.php +++ /dev/null @@ -1,27 +0,0 @@ -addServer('127.0.0.1', 5672); - - $worker->addFunction('sleep', function($job) { - /** @var Job $job */ - var_dump(get_class($job)); - echo "Task {$job->getId()} started with data: {$job->getData()} \n"; - sleep($job->getData()); - echo "Task {$job->getId()} completed with data: {$job->getData()} \n"; - - return $job->getData(); - }); - - - - while(true) - { - echo "Waiting for job... \n"; - $worker->work(); - } \ No newline at end of file diff --git a/tests/tamer.php b/tests/tamer.php new file mode 100644 index 0000000..19cdb6e --- /dev/null +++ b/tests/tamer.php @@ -0,0 +1,70 @@ +getData()} seconds \n"; + })); } - // Sleep function (task) - Tamer::queue(\Tamer\Objects\Task::create('sleep', 5, function(\Tamer\Objects\JobResults $data) - { - echo "Slept for {$data->getData()} seconds \n"; - })); - + echo "Waiting for jobs to finish \n"; $a = microtime(true); Tamer::run(); $b = microtime(true); - echo "Took " . ($b - $a) . " seconds \n"; \ No newline at end of file diff --git a/tests/tamer_worker.php b/tests/tamer_worker.php index f7bb22b..8b8f770 100644 --- a/tests/tamer_worker.php +++ b/tests/tamer_worker.php @@ -8,9 +8,7 @@ import('net.nosial.tamerlib', 'latest'); - Tamer::connect(ProtocolType::Gearman, Mode::Worker, - ['127.0.0.1:4730'] - ); + Tamer::initWorker(); Tamer::addFunction('sleep', function(\Tamer\Objects\Job $job) { sleep($job->getData());