diff --git a/.idea/php.xml b/.idea/php.xml index 2c4b796..b479234 100644 --- a/.idea/php.xml +++ b/.idea/php.xml @@ -16,6 +16,7 @@ + diff --git a/README.md b/README.md index 509af91..5d46992 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,24 @@ Coming soon... +## Terminology + +### Components + + - **Supervisor** - The main component of the library, this is the component that is responsible for manging + workers + - **Worker** - The component that is responsible for executing tasks + - **Task** - The component that is responsible for executing a function or closure + +### Function Names + - **do** - Execute a function in the background without blocking the current thread, + this does not return a value. (This is a fire and forget function) + - **doClosure** - Execute a closure in the background without blocking the current thread, + this does not return a value. (This is a fire and forget function) + - **queue** - Queues a function to be executed in the background until the next time the run function is called. + - **queueClosure** - Queues a closure to be executed in th background until the next time the run function is called. + - **run** - Executes all queued functions and closures in parallel and waits for the tasks to complete. + # License diff --git a/project.json b/project.json index d90d02c..b10434a 100644 --- a/project.json +++ b/project.json @@ -19,7 +19,7 @@ "assembly": { "name": "TamerLib", "package": "net.nosial.tamerlib", - "description": "Telegram Bot API Library (A super-powered wrapper for longman/telegram-bot)", + "description": "TamerLib allows the execution of parallel tasks", "company": "Nosial", "copyright": "Copyright (c) 2022-2023 Nosial, All Rights Reserved", "version": "1.0.0", diff --git a/src/Tamer/Abstracts/Mode.php b/src/Tamer/Abstracts/Mode.php new file mode 100644 index 0000000..b8ee9f5 --- /dev/null +++ b/src/Tamer/Abstracts/Mode.php @@ -0,0 +1,10 @@ + new \Tamer\Protocols\Gearman\Client($username, $password), + ProtocolType::RabbitMQ => new \Tamer\Protocols\RabbitMq\Client($username, $password), + default => throw new InvalidArgumentException('Invalid protocol type'), + }; + } + + /** + * @param string $protocol + * @param string|null $username + * @param string|null $password + * @return WorkerProtocolInterface + */ + public static function createWorker(string $protocol, ?string $username=null, ?string $password=null): WorkerProtocolInterface + { + /** @noinspection PhpFullyQualifiedNameUsageInspection */ + return match (strtolower($protocol)) + { + ProtocolType::Gearman => new \Tamer\Protocols\Gearman\Worker($username, $password), + ProtocolType::RabbitMQ => new \Tamer\Protocols\RabbitMq\Worker($username, $password), + default => throw new InvalidArgumentException('Invalid protocol type'), + }; + } } \ No newline at end of file diff --git a/src/Tamer/Classes/Validate.php b/src/Tamer/Classes/Validate.php index 00ea7b8..6895cdf 100644 --- a/src/Tamer/Classes/Validate.php +++ b/src/Tamer/Classes/Validate.php @@ -2,6 +2,7 @@ namespace Tamer\Classes; + use Tamer\Abstracts\Mode; use Tamer\Abstracts\ProtocolType; use Tamer\Abstracts\TaskPriority; @@ -17,7 +18,20 @@ { return match (strtolower($input)) { - ProtocolType::Gearman, ProtocolType::RabbitMQ, ProtocolType::Redis => true, + ProtocolType::Gearman, ProtocolType::RabbitMQ => true, + default => false, + }; + } + + /** + * @param string $input + * @return bool + */ + public static function mode(string $input): bool + { + return match (strtolower($input)) + { + Mode::Client, Mode::Worker => true, default => false, }; } diff --git a/src/Tamer/Client.php b/src/Tamer/Client.php deleted file mode 100644 index c6c8268..0000000 --- a/src/Tamer/Client.php +++ /dev/null @@ -1,43 +0,0 @@ -setProtocol($protocol); - } - - /** - * @return string - */ - public function getProtocol(): string - { - return $this->protocol; - } - - /** - * @param string $protocol - */ - public function setProtocol(string $protocol): void - { - if(!Validate::protocolType($protocol)) - { - throw new InvalidArgumentException("Invalid protocol type: $protocol"); - } - - $this->protocol = $protocol; - } - } \ No newline at end of file diff --git a/src/Tamer/Exceptions/ServerException.php b/src/Tamer/Exceptions/ConnectionException.php similarity index 53% rename from src/Tamer/Exceptions/ServerException.php rename to src/Tamer/Exceptions/ConnectionException.php index 93359a4..5850207 100644 --- a/src/Tamer/Exceptions/ServerException.php +++ b/src/Tamer/Exceptions/ConnectionException.php @@ -3,15 +3,16 @@ namespace Tamer\Exceptions; use Exception; + use Throwable; - class ServerException extends Exception + class ConnectionException extends Exception { /** * @param string $message * @param int $code - * @param Exception|null $previous + * @param Throwable|null $previous */ - public function __construct(string $message, int $code=0, Exception $previous=null) + public function __construct(string $message="", int $code=0, Throwable $previous=null) { parent::__construct($message, $code, $previous); } diff --git a/src/Tamer/Exceptions/WorkerException.php b/src/Tamer/Exceptions/WorkerException.php deleted file mode 100644 index 28ff58e..0000000 --- a/src/Tamer/Exceptions/WorkerException.php +++ /dev/null @@ -1,18 +0,0 @@ -function_name = $function_name; $this->data = $data; @@ -56,6 +58,19 @@ $this->closure = false; } + /** + * Static Constructor + * + * @param string $function_name + * @param string|Closure|null $data + * @param callable|null $callback + * @return static + */ + public static function create(string $function_name, string|Closure|null $data, callable $callback=null): self + { + return new self($function_name, $data, $callback); + } + /** * Returns the function name for the task * @@ -134,20 +149,22 @@ } /** - * @param callable|null $callback + * @param Closure|null $callback + * @return Task */ - public function setCallback(?callable $callback): void + public function setCallback(?Closure $callback): self { $this->callback = $callback; + return $this; } /** * Executes the callback function * - * @param JobResults $result + * @param string|JobResults|null $result * @return void */ - public function runCallback(JobResults $result): void + public function runCallback(string|JobResults|null $result): void { if($this->callback !== null) { @@ -165,9 +182,11 @@ /** * @param bool $closure + * @return Task */ - public function setClosure(bool $closure): void + public function setClosure(bool $closure): self { $this->closure = $closure; + return $this; } } \ No newline at end of file diff --git a/src/Tamer/Protocols/Gearman/Client.php b/src/Tamer/Protocols/Gearman/Client.php index c6fb969..ce6151c 100644 --- a/src/Tamer/Protocols/Gearman/Client.php +++ b/src/Tamer/Protocols/Gearman/Client.php @@ -6,11 +6,12 @@ use Closure; use Exception; + use GearmanClient; use GearmanTask; use LogLib\Log; - use Tamer\Abstracts\JobStatus; + use ncc\Utilities\Console; use Tamer\Abstracts\TaskPriority; - use Tamer\Exceptions\ServerException; + use Tamer\Exceptions\ConnectionException; use Tamer\Interfaces\ClientProtocolInterface; use Tamer\Objects\Job; use Tamer\Objects\JobResults; @@ -19,14 +20,18 @@ class Client implements ClientProtocolInterface { /** - * @var \GearmanClient|null $client + * The Gearman Client object + * + * @var GearmanClient|null $client */ private $client; /** + * An array of servers that have been defined + * * @var array */ - private $server_cache; + private $defined_servers; /** * Used for tracking the current execution of tasks and run callbacks on completion @@ -36,15 +41,27 @@ private $tasks; /** + * Indicates if the client should automatically reconnect to the server if the connection is lost + * (default: true) + * * @var bool */ private $automatic_reconnect; /** + * The Unix timestamp of the next time the client should attempt to reconnect to the server + * * @var int */ private $next_reconnect; + /** + * The options to use when connecting to the server + * + * @var array + */ + private $options; + /** * @inheritDoc * @param string|null $username @@ -56,47 +73,8 @@ $this->tasks = []; $this->automatic_reconnect = false; $this->next_reconnect = time() + 1800; - $this->server_cache = []; - - try - { - $this->reconnect(); - } - catch(ServerException $e) - { - unset($e); - } - } - - /** - * Adds client options - * - * @link http://php.net/manual/en/gearmanclient.addoptions.php - * @param int[] $options (GEARMAN_CLIENT_NON_BLOCKING, GEARMAN_CLIENT_UNBUFFERED_RESULT, GEARMAN_CLIENT_FREE_TASKS) - * @return bool - */ - public function addOptions(array $options): bool - { - // Parse $options combination via bitwise OR operator - $options = array_reduce($options, function($carry, $item) - { - return $carry | $item; - }); - - return $this->client->addOptions($options); - } - - /** - * Registers callbacks for the client - * - * @return void - */ - private function registerCallbacks(): void - { - $this->client->setCompleteCallback([$this, 'callbackHandler']); - $this->client->setFailCallback([$this, 'callbackHandler']); - $this->client->setDataCallback([$this, 'callbackHandler']); - $this->client->setStatusCallback([$this, 'callbackHandler']); + $this->defined_servers = []; + $this->options = []; } @@ -106,31 +84,21 @@ * @link http://php.net/manual/en/gearmanclient.addserver.php * @param string $host (127.0.0.1) * @param int $port (default: 4730) - * @return bool - * @throws ServerException + * @return void */ - public function addServer(string $host='127.0.0.1', int $port=4730): bool + public function addServer(string $host, int $port): void { - if(!isset($this->server_cache[$host])) + if(!isset($this->defined_servers[$host])) { - $this->server_cache[$host] = []; + $this->defined_servers[$host] = []; } - if(in_array($port, $this->server_cache[$host])) + if(in_array($port, $this->defined_servers[$host])) { - return true; + return; } - $this->server_cache[$host][] = $port; - - try - { - return $this->client->addServer($host, $port); - } - catch(Exception $e) - { - throw new ServerException($e->getMessage(), $e->getCode(), $e); - } + $this->defined_servers[$host][] = $port; } /** @@ -138,11 +106,160 @@ * * @link http://php.net/manual/en/gearmanclient.addservers.php * @param array $servers (host:port, host:port, ...) + * @return void + */ + public function addServers(array $servers): void + { + foreach($servers as $server) + { + $server = explode(':', $server); + $this->addServer($server[0], $server[1]); + } + } + + /** + * Connects to the server(s) + * + * @return void + * @throws ConnectionException + */ + public function connect(): void + { + if($this->isConnected()) + return; + + $this->client = new GearmanClient(); + + // Parse $options combination via bitwise OR operator + $options = array_reduce($this->options, function($carry, $item) + { + return $carry | $item; + }); + + $this->client->addOptions($options); + + foreach($this->defined_servers as $host => $ports) + { + foreach($ports as $port) + { + try + { + $this->client->addServer($host, $port); + Log::debug('net.nosial.tamerlib', 'connected to gearman server: ' . $host . ':' . $port); + } + catch(Exception $e) + { + throw new ConnectionException('Failed to connect to Gearman server: ' . $host . ':' . $port, 0, $e); + } + } + } + + $this->client->setCompleteCallback([$this, 'callbackHandler']); + $this->client->setFailCallback([$this, 'callbackHandler']); + $this->client->setDataCallback([$this, 'callbackHandler']); + $this->client->setStatusCallback([$this, 'callbackHandler']); + } + + /** + * Disconnects from the server(s) + * + * @return void + */ + public function disconnect(): void + { + if(!$this->isConnected()) + return; + + Log::debug('net.nosial.tamerlib', 'disconnecting from gearman server(s)'); + $this->client->clearCallbacks(); + unset($this->client); + $this->client = null; + } + + /** + * Reconnects to the server(s) + * + * @return void + * @throws ConnectionException + */ + public function reconnect(): void + { + Console::outDebug('net.nosial.tamerlib', 'reconnecting to gearman server(s)'); + + $this->disconnect(); + $this->connect(); + } + + /** + * Returns the current status of the client + * + * @inheritDoc * @return bool */ - public function addServers(array $servers): bool + public function isConnected(): bool { - return $this->client->addServers(implode(',', $servers)); + if($this->client === null) + { + return false; + } + + return true; + } + + /** + * The automatic reconnect process + * + * @return void + */ + private function preformAutoreconf(): void + { + if($this->automatic_reconnect && $this->next_reconnect < time()) + { + try + { + $this->reconnect(); + } + catch (Exception $e) + { + Log::error('net.nosial.tamerlib', 'Failed to reconnect to Gearman server: ' . $e->getMessage()); + } + finally + { + $this->next_reconnect = time() + 1800; + } + } + } + + /** + * Adds client options + * + * @link http://php.net/manual/en/gearmanclient.addoptions.php + * @param int[] $options (GEARMAN_CLIENT_NON_BLOCKING, GEARMAN_CLIENT_UNBUFFERED_RESULT, GEARMAN_CLIENT_FREE_TASKS) + * @return void + */ + public function setOptions(array $options): void + { + $this->options = $options; + } + + /** + * Returns the current client options + * + * @return array + */ + public function getOptions(): array + { + return $this->options; + } + + /** + * Clears the current client options + * + * @return void + */ + public function clearOptions(): void + { + $this->options = []; } /** @@ -150,7 +267,6 @@ * * @param Closure $closure * @return void - * @throws ServerException */ public function doClosure(Closure $closure): void { @@ -164,15 +280,10 @@ * * @param Task $task * @return void - * @throws ServerException */ public function do(Task $task): void { - if($this->automatic_reconnect && time() > $this->next_reconnect) - { - $this->reconnect(); - $this->next_reconnect = time() + 1800; - } + $this->preformAutoreconf(); $this->tasks[] = $task; $job = new Job($task); @@ -192,7 +303,6 @@ $this->client->doBackground($task->getFunctionName(), msgpack_pack($job->toArray())); break; } - } /** @@ -200,15 +310,10 @@ * * @param Task $task * @return void - * @throws ServerException */ public function queue(Task $task): void { - if($this->automatic_reconnect && time() > $this->next_reconnect) - { - $this->reconnect(); - $this->next_reconnect = time() + 1800; - } + $this->preformAutoreconf(); $this->tasks[] = $task; $job = new Job($task); @@ -234,11 +339,10 @@ * Adds a closure task to the list of tasks to run * * @param Closure $closure - * @param $callback + * @param Closure|null $callback * @return void - * @throws ServerException */ - public function queueClosure(Closure $closure, $callback): void + public function queueClosure(Closure $closure, ?Closure $callback=null): void { $closure_task = new Task('tamer_closure', $closure, $callback); $closure_task->setClosure(true); @@ -247,15 +351,13 @@ /** * @return bool - * @throws ServerException */ public function run(): bool { - if($this->automatic_reconnect && time() > $this->next_reconnect) - { - $this->reconnect(); - $this->next_reconnect = time() + 1800; - } + if(!$this->isConnected()) + return false; + + $this->preformAutoreconf(); if(!$this->client->runTasks()) { @@ -275,21 +377,24 @@ { $job_result = JobResults::fromArray(msgpack_unpack($task->data())); $internal_task = $this->getTaskById($job_result->getId()); - $job_status = match ($task->returnCode()) - { - GEARMAN_WORK_EXCEPTION => JobStatus::Exception, - GEARMAN_WORK_FAIL => JobStatus::Failure, - default => JobStatus::Success, - }; + + Log::debug('net.nosial.tamerlib', 'callback for task ' . $internal_task->getId() . ' with status ' . $job_result->getStatus() . ' and data size ' . strlen($task->data()) . ' bytes'); try { - Log::debug('net.nosial.tamer', 'callback for task ' . $internal_task->getId() . ' with status ' . $job_status . ' and data size ' . strlen($task->data()) . ' bytes'); + if($internal_task->isClosure()) + { + // If the task is a closure, we need to run the callback with the closure's return value + // instead of the job result object + $internal_task->runCallback($job_result->getData()); + return; + } + $internal_task->runCallback($job_result); } catch(Exception $e) { - Log::error('net.nosial.tamer', 'Callback for task ' . $internal_task->getId() . ' failed with error: ' . $e->getMessage(), $e); + Log::error('net.nosial.tamerlib', 'Failed to run callback for task ' . $internal_task->getId() . ': ' . $e->getMessage(), $e); } finally { @@ -314,24 +419,6 @@ return null; } - /** - * @throws ServerException - */ - private function reconnect() - { - $this->client = new \GearmanClient(); - - foreach($this->server_cache as $host => $ports) - { - foreach($ports as $port) - { - $this->addServer($host, $port); - } - } - - $this->registerCallbacks(); - } - /** * Removes a task from the list of tasks * @@ -350,17 +437,17 @@ /** * @return bool */ - public function isAutomaticReconnect(): bool + public function automaticReconnectionEnabled(): bool { return $this->automatic_reconnect; } /** - * @param bool $automatic_reconnect + * @param bool $enable */ - public function setAutomaticReconnect(bool $automatic_reconnect): void + public function enableAutomaticReconnection(bool $enable): void { - $this->automatic_reconnect = $automatic_reconnect; + $this->automatic_reconnect = $enable; } /** @@ -370,13 +457,15 @@ { try { - $this->client->runTasks(); + $this->run(); } catch(Exception $e) { unset($e); } - - unset($this->client); + finally + { + $this->disconnect(); + } } } \ No newline at end of file diff --git a/src/Tamer/Protocols/Gearman/Worker.php b/src/Tamer/Protocols/Gearman/Worker.php index a08b1b6..8404b02 100644 --- a/src/Tamer/Protocols/Gearman/Worker.php +++ b/src/Tamer/Protocols/Gearman/Worker.php @@ -6,10 +6,11 @@ use Exception; use GearmanJob; + use GearmanWorker; + use LogLib\Log; use Opis\Closure\SerializableClosure; use Tamer\Abstracts\JobStatus; - use Tamer\Exceptions\ServerException; - use Tamer\Exceptions\WorkerException; + use Tamer\Exceptions\ConnectionException; use Tamer\Interfaces\WorkerProtocolInterface; use Tamer\Objects\Job; use Tamer\Objects\JobResults; @@ -17,92 +18,76 @@ class Worker implements WorkerProtocolInterface { /** - * @var \GearmanWorker|null + * The Gearman Worker Instance (if connected) + * + * @var GearmanWorker|null */ private $worker; /** + * The list of servers that have been added + * * @var array */ - private $server_cache; + private $defined_servers; /** + * Indicates if the worker should automatically reconnect to the server + * * @var bool */ private $automatic_reconnect; /** + * The Unix Timestamp of when the next reconnect should occur (if automatic_reconnect is true) + * * @var int */ private $next_reconnect; + /** + * The options to use when connecting to the server + * + * @var array + */ + private $options; + + /** + * Public Constructor with optional username and password + * + * @param string|null $username + * @param string|null $password + */ public function __construct(?string $username=null, ?string $password=null) { $this->worker = null; - $this->server_cache = []; + $this->defined_servers = []; $this->automatic_reconnect = false; $this->next_reconnect = time() + 1800; - - try - { - $this->reconnect(); - } - catch(Exception $e) - { - unset($e); - } + $this->options = []; } - /** - * @param array $options - * @return bool - * @inheritDoc - */ - public function addOptions(array $options): bool - { - if($this->worker == null) - { - return false; - } - - $options = array_map(function($option) - { - return constant($option); - }, $options); - - return $this->worker->addOptions(array_sum($options)); - } /** * Adds a server to the list of servers to use * * @link http://php.net/manual/en/gearmanworker.addserver.php * @param string $host ( * @param int $port (default: 4730) - * @return bool - * @throws ServerException + * @return void */ - public function addServer(string $host='127.0.0.1', int $port=4730): bool + public function addServer(string $host, int $port): void { - if(!isset($this->server_cache[$host])) + if(!isset($this->defined_servers[$host])) { - $this->server_cache[$host] = []; + $this->defined_servers[$host] = []; } - if(in_array($port, $this->server_cache[$host])) + if(in_array($port, $this->defined_servers[$host])) { - return true; + return; } - $this->server_cache[$host][] = $port; - - try - { - return $this->worker->addServer($host, $port); - } - catch(Exception $e) - { - throw new ServerException($e->getMessage(), $e->getCode(), $e); - } + $this->defined_servers[$host][] = $port; } /** @@ -111,7 +96,6 @@ * @link http://php.net/manual/en/gearmanworker.addservers.php * @param string[] $servers (host:port, host:port, ...) * @return void - * @throws ServerException */ public function addServers(array $servers): void { @@ -122,85 +106,42 @@ } } - /** - * Adds a function to the list of functions to call + * Connects to the server * - * @link http://php.net/manual/en/gearmanworker.addfunction.php - * @param string $function_name The name of the function to register with the job server - * @param callable $function The callback function to call when the job is received - * @param mixed|null $context (optional) The context to pass to the callback function * @return void + * @throws ConnectionException */ - public function addFunction(string $function_name, callable $function, mixed $context=null): void + public function connect(): void { - $this->worker->addFunction($function_name, function(GearmanJob $job) use ($function, $context) - { - $received_job = Job::fromArray(msgpack_unpack($job->workload())); + if($this->isConnected()) + return; - try - { - $result = $function($received_job, $context); - } - catch(Exception $e) - { - $job->sendFail(); - return; - } - - $job_results = new JobResults($received_job, JobStatus::Success, $result); - $job->sendComplete(msgpack_pack($job_results->toArray())); - - }); - } - - /** - * Removes a function from the list of functions to call - * - * @param string $function_name The name of the function to unregister - * @return void - */ - public function removeFunction(string $function_name): void - { - $this->worker->unregister($function_name); - } - - /** - * @return bool - */ - public function isAutomaticReconnect(): bool - { - return $this->automatic_reconnect; - } - - /** - * @param bool $automatic_reconnect - * @return void - */ - public function setAutomaticReconnect(bool $automatic_reconnect): void - { - $this->automatic_reconnect = $automatic_reconnect; - } - - /** - * @throws ServerException - */ - private function reconnect() - { - $this->worker = new \GearmanWorker(); + $this->worker = new GearmanWorker(); $this->worker->addOptions(GEARMAN_WORKER_GRAB_UNIQ); - foreach($this->server_cache as $host => $ports) + Log::debug('net.nosial.tamerlib', 'connecting to gearman server(s)'); + + foreach($this->defined_servers as $host => $ports) { foreach($ports as $port) { - $this->addServer($host, $port); + try + { + $this->worker->addServer($host, $port); + Log::debug('net.nosial.tamerlib', 'connected to gearman server: ' . $host . ':' . $port); + } + catch(Exception $e) + { + throw new ConnectionException('Failed to connect to Gearman server: ' . $host . ':' . $port, 0, $e); + } } } $this->worker->addFunction('tamer_closure', function(GearmanJob $job) { $received_job = Job::fromArray(msgpack_unpack($job->workload())); + Log::debug('net.nosial.tamerlib', 'received closure: ' . $received_job->getId()); try { @@ -217,9 +158,163 @@ $job_results = new JobResults($received_job, JobStatus::Success, $result); $job->sendComplete(msgpack_pack($job_results->toArray())); + Log::debug('net.nosial.tamerlib', 'completed closure: ' . $received_job->getId()); }); } + /** + * Disconnects from the server + * + * @return void + */ + public function disconnect(): void + { + if(!$this->isConnected()) + return; + + $this->worker->unregisterAll(); + unset($this->worker); + $this->worker = null; + } + + /** + * Reconnects to the server if the connection has been lost + * + * @return void + * @throws ConnectionException + */ + public function reconnect(): void + { + $this->disconnect(); + $this->connect(); + } + + /** + * Returns true if the worker is connected to the server + * + * @return bool + */ + public function isConnected(): bool + { + return $this->worker !== null; + } + + /** + * The automatic reconnect process + * + * @return void + */ + private function preformAutoreconf(): void + { + if($this->automatic_reconnect && $this->next_reconnect < time()) + { + try + { + $this->reconnect(); + } + catch (Exception $e) + { + Log::error('net.nosial.tamerlib', 'Failed to reconnect to Gearman server: ' . $e->getMessage()); + } + finally + { + $this->next_reconnect = time() + 1800; + } + } + } + + /** + * Sets the options to use when connecting to the server + * + * @param array $options + * @return bool + * @inheritDoc + */ + public function setOptions(array $options): void + { + $this->options = $options; + } + + /** + * Returns the options to use when connecting to the server + * + * @return array + */ + public function getOptions(): array + { + return $this->options; + } + + /** + * Clears the options to use when connecting to the server + * + * @return void + */ + public function clearOptions(): void + { + $this->options = []; + } + + /** + * @return bool + */ + public function automaticReconnectionEnabled(): bool + { + return $this->automatic_reconnect; + } + + /** + * @param bool $enable + * @return void + */ + public function enableAutomaticReconnection(bool $enable): void + { + $this->automatic_reconnect = $enable; + } + + /** + * Adds a function to the list of functions to call + * + * @link http://php.net/manual/en/gearmanworker.addfunction.php + * @param string $name The name of the function to register with the job server + * @param callable $callable The callback function to call when the job is received + * @return void + */ + public function addFunction(string $name, callable $callable): void + { + $this->worker->addFunction($name, function(GearmanJob $job) use ($callable) + { + $received_job = Job::fromArray(msgpack_unpack($job->workload())); + Log::debug('net.nosial.tamerlib', 'received job: ' . $received_job->getId()); + + try + { + $result = $callable($received_job); + } + catch(Exception $e) + { + $job->sendFail(); + unset($e); + return; + } + + $job_results = new JobResults($received_job, JobStatus::Success, $result); + $job->sendComplete(msgpack_pack($job_results->toArray())); + Log::debug('net.nosial.tamerlib', 'completed job: ' . $received_job->getId()); + }); + } + + /** + * Removes a function from the list of functions to call + * + * @param string $function_name The name of the function to unregister + * @return void + */ + public function removeFunction(string $function_name): void + { + $this->worker->unregister($function_name); + } + /** * Waits for a job and calls the appropriate callback function * @@ -228,26 +323,20 @@ * @param int $timeout (default: 500) The timeout in milliseconds (if $blocking is false) * @param bool $throw_errors (default: false) Whether to throw exceptions on errors * @return void Returns nothing - * @throws ServerException If the worker cannot connect to the server - * @throws WorkerException If the worker encounters an error while working if $throw_errors is true + * @throws ConnectionException */ public function work(bool $blocking=true, int $timeout=500, bool $throw_errors=false): void { - if($this->automatic_reconnect && (time() > $this->next_reconnect)) - { - $this->reconnect(); - $this->next_reconnect = time() + 1800; - } - $this->worker->setTimeout($timeout); while(true) { + @$this->preformAutoreconf(); @$this->worker->work(); if($this->worker->returnCode() == GEARMAN_COULD_NOT_CONNECT) { - throw new ServerException('Could not connect to Gearman server'); + throw new ConnectionException('Could not connect to Gearman server'); } if($this->worker->returnCode() == GEARMAN_TIMEOUT && !$blocking) @@ -257,8 +346,28 @@ if($this->worker->returnCode() != GEARMAN_SUCCESS && $throw_errors) { - throw new WorkerException('Gearman worker error: ' . $this->worker->error(), $this->worker->returnCode()); + Log::error('net.nosial.tamerlib', 'Gearman worker error: ' . $this->worker->error()); + } + + if($blocking) + { + usleep($timeout); } } } + + /** + * Executes all remaining tasks and closes the connection + */ + public function __destruct() + { + try + { + $this->disconnect(); + } + catch(Exception $e) + { + unset($e); + } + } } \ No newline at end of file diff --git a/src/Tamer/Protocols/RabbitMq/Client.php b/src/Tamer/Protocols/RabbitMq/Client.php index 2fca629..581bd1c 100644 --- a/src/Tamer/Protocols/RabbitMq/Client.php +++ b/src/Tamer/Protocols/RabbitMq/Client.php @@ -90,7 +90,7 @@ } } - public function addOptions(array $options): bool + public function setOptions(array $options): bool { $this->options = $options; return true; @@ -294,7 +294,7 @@ * * @return bool */ - public function isAutomaticReconnect(): bool + public function automaticReconnectionEnabled(): bool { return $this->automatic_reconnect; } @@ -302,12 +302,12 @@ /** * Enables or disables automatic reconnecting to the server * - * @param bool $automatic_reconnect + * @param bool $enable * @return void */ - public function setAutomaticReconnect(bool $automatic_reconnect): void + public function enableAutomaticReconnection(bool $enable): void { - $this->automatic_reconnect = $automatic_reconnect; + $this->automatic_reconnect = $enable; } private function reconnect() diff --git a/src/Tamer/Protocols/RabbitMq/Worker.php b/src/Tamer/Protocols/RabbitMq/Worker.php index 071ae55..e66f7d7 100644 --- a/src/Tamer/Protocols/RabbitMq/Worker.php +++ b/src/Tamer/Protocols/RabbitMq/Worker.php @@ -119,7 +119,7 @@ /** * @inheritDoc */ - public function addOptions(array $options): bool + public function setOptions(array $options): bool { $this->options = $options; return true; @@ -128,7 +128,7 @@ /** * @inheritDoc */ - public function isAutomaticReconnect(): bool + public function automaticReconnectionEnabled(): bool { return $this->automatic_reconnect; } @@ -136,18 +136,18 @@ /** * @inheritDoc */ - public function setAutomaticReconnect(bool $automatic_reconnect): void + public function enableAutomaticReconnection(bool $enable): void { - $this->automatic_reconnect = $automatic_reconnect; + $this->automatic_reconnect = $enable; } /** * @inheritDoc */ - public function addFunction(string $function_name, callable $function, mixed $context = null): void + public function addFunction(string $name, callable $callable, mixed $context = null): void { - $this->functions[$function_name] = [ - 'function' => $function, + $this->functions[$name] = [ + 'function' => $callable, 'context' => $context ]; } diff --git a/src/Tamer/Tamer.php b/src/Tamer/Tamer.php new file mode 100644 index 0000000..72a6010 --- /dev/null +++ b/src/Tamer/Tamer.php @@ -0,0 +1,336 @@ +addServers($servers); + self::$client->connect(); + } + elseif(self::$mode === Mode::Worker) + { + self::$worker = Functions::createWorker($protocol, $username, $password); + self::$worker->addServers($servers); + self::$worker->connect(); + } + else + { + throw new InvalidArgumentException(sprintf('Invalid mode: %s', $mode)); + } + + self::$connected = true; + } + + + /** + * Disconnects from the server + * + * @return void + * @throws ConnectionException + */ + public static function disconnect(): void + { + if (!self::$connected) + { + throw new ConnectionException('Tamer is not connected to the server'); + } + + if (self::$mode === Mode::Client) + { + self::$client->disconnect(); + } + else + { + self::$worker->disconnect(); + } + + self::$connected = false; + } + + /** + * Reconnects to the server + * + * @return void + * @throws ConnectionException + */ + public static function reconnect(): void + { + if (self::$mode === Mode::Client) + { + self::$client->reconnect(); + } + else + { + self::$worker->reconnect(); + } + } + + /** + * Adds a task to the queue to be executed by the worker + * + * @param Task $task + * @return void + */ + public static function do(Task $task): void + { + if (self::$mode === Mode::Client) + { + self::$client->do($task); + } + else + { + throw new InvalidArgumentException('Tamer is not running in client mode'); + } + } + + /** + * Executes a closure operation in the background (does not return a result) + * + * @param Closure $closure The closure operation to perform (remote) + * @return void + */ + public static function doClosure(Closure $closure): void + { + if (self::$mode === Mode::Client) + { + self::$client->doClosure($closure); + } + else + { + throw new InvalidArgumentException('Tamer is not running in client mode'); + } + } + + /** + * Queues a task to be processed in parallel (returns a result handled by a callback) + * + * @param Task $task + * @return void + */ + public static function queue(Task $task): void + { + if (self::$mode === Mode::Client) + { + self::$client->queue($task); + } + else + { + throw new InvalidArgumentException('Tamer is not running in client mode'); + } + } + + /** + * Queues a closure to be processed in parallel (returns a result handled by a callback) + * + * @param Closure $closure The closure operation to perform (remote) + * @param Closure|null $callback The closure to call when the operation is complete (local) + * @return void + */ + public static function queueClosure(Closure $closure, ?Closure $callback=null): void + { + if (self::$mode === Mode::Client) + { + self::$client->queueClosure($closure, $callback); + } + else + { + throw new InvalidArgumentException('Tamer is not running in client mode'); + } + } + + /** + * Executes all tasks in the queue and waits for them to complete + * + * @return bool + */ + public static function run(): bool + { + if (self::$mode === Mode::Client) + { + return self::$client->run(); + } + else + { + throw new InvalidArgumentException('Tamer is not running in client mode'); + } + } + + /** + * Registers a function to the worker + * + * @param string $name The name of the function to add + * @param callable $callable The function to add + * @return void + */ + public static function addFunction(string $name, callable $callable): void + { + if (self::$mode === Mode::Worker) + { + self::$worker->addFunction($name, $callable); + } + else + { + throw new InvalidArgumentException('Tamer is not running in worker mode'); + } + } + + /** + * Removes a function from the worker + * + * @param string $function_name The name of the function to remove + * @return void + */ + public static function removeFunction(string $function_name): void + { + if (self::$mode === Mode::Worker) + { + self::$worker->removeFunction($function_name); + } + else + { + throw new InvalidArgumentException('Tamer is not running in worker mode'); + } + } + + /** + * Works a job from the queue (blocking or non-blocking) + * + * @param bool $blocking (optional) Whether to block until a job is available + * @param int $timeout (optional) The timeout to use when blocking + * @param bool $throw_errors (optional) Whether to throw errors or not + * @return void + */ + public static function work(bool $blocking=true, int $timeout=500, bool $throw_errors=false): void + { + if (self::$mode === Mode::Worker) + { + self::$worker->work($blocking, $timeout, $throw_errors); + } + else + { + throw new InvalidArgumentException('Tamer is not running in worker mode'); + } + } + + /** + * @return string + */ + public static function getProtocol(): string + { + return self::$protocol; + } + + /** + * @return ClientProtocolInterface|null + */ + public static function getClient(): ?ClientProtocolInterface + { + return self::$client; + } + + /** + * @return WorkerProtocolInterface|null + */ + public static function getWorker(): ?WorkerProtocolInterface + { + return self::$worker; + } + + /** + * @return string + */ + public static function getMode(): string + { + return self::$mode; + } + + /** + * @return bool + */ + public static function isConnected(): bool + { + return self::$connected; + } + } \ No newline at end of file diff --git a/tests/rabbitmq_client.php b/tests/rabbitmq_client.php index cab644d..5e3efb5 100644 --- a/tests/rabbitmq_client.php +++ b/tests/rabbitmq_client.php @@ -1,6 +1,6 @@ do(new Task('sleep', '5')); + $client->do(Task::create('sleep', '5') + ->setPriority(TaskPriority::High) + ); } diff --git a/tests/tamer_client.php b/tests/tamer_client.php new file mode 100644 index 0000000..8033030 --- /dev/null +++ b/tests/tamer_client.php @@ -0,0 +1,57 @@ +getData()} seconds \n"; + })); + + $a = microtime(true); + Tamer::run(); + $b = microtime(true); + + echo "Took " . ($b - $a) . " seconds \n"; \ No newline at end of file diff --git a/tests/tamer_worker.php b/tests/tamer_worker.php new file mode 100644 index 0000000..f7bb22b --- /dev/null +++ b/tests/tamer_worker.php @@ -0,0 +1,20 @@ +getData()); + return $job->getData(); + }); + + Tamer::work(); \ No newline at end of file