From d15da30813c3277d3a6479a73cfbae312bfbc2b0 Mon Sep 17 00:00:00 2001 From: Netkas Date: Thu, 2 Feb 2023 16:55:19 -0500 Subject: [PATCH] Completed Closure implementation (Still a wip) --- src/Tamer/Objects/Job.php | 22 ++++-- src/Tamer/Objects/Task.php | 15 +++-- src/Tamer/Protocols/GearmanClient.php | 14 ++-- src/Tamer/Protocols/GearmanWorker.php | 3 +- src/Tamer/Protocols/RabbitMqClient.php | 93 +++++++++++++++++++++++++- 5 files changed, 123 insertions(+), 24 deletions(-) diff --git a/src/Tamer/Objects/Job.php b/src/Tamer/Objects/Job.php index 8053815..7f6680b 100644 --- a/src/Tamer/Objects/Job.php +++ b/src/Tamer/Objects/Job.php @@ -4,6 +4,10 @@ namespace Tamer\Objects; + use Closure; + use Opis\Closure\SerializableClosure; + use function unserialize; + class Job { /** @@ -23,7 +27,7 @@ /** * The data to be passed to the function * - * @var string + * @var string|Closure|null */ private $data; @@ -65,9 +69,9 @@ /** * Returns the data of the Job * - * @return string + * @return string|Closure|null */ - public function getData(): string + public function getData(): Closure|string|null { return $this->data; } @@ -90,7 +94,7 @@ return [ 'id' => $this->id, 'name' => $this->name, - 'data' => ($this->closure ? \Opis\Closure\serialize($this->data) : $this->data), + 'data' => ($this->closure ? serialize(new SerializableClosure($this->data)) : $this->data), 'closure' => $this->closure ]; } @@ -103,12 +107,16 @@ */ public static function fromArray(array $data): Job { - $data = $data['data']; + $job_data = $data['data']; if($data['closure'] === true) - $data = \Opis\Closure\unserialize($data['data']); + { + /** @var SerializableClosure $job_data */ + $job_data = unserialize($data['data']); + $job_data = $job_data->getClosure(); + } - $job = new Job(new Task($data['name'], $data['data'])); + $job = new Job(new Task($data['name'], $job_data)); $job->id = $data['id']; $job->closure = $data['closure']; diff --git a/src/Tamer/Objects/Task.php b/src/Tamer/Objects/Task.php index 54152ce..c4e8a1d 100644 --- a/src/Tamer/Objects/Task.php +++ b/src/Tamer/Objects/Task.php @@ -2,6 +2,7 @@ namespace Tamer\Objects; + use Closure; use InvalidArgumentException; use Tamer\Abstracts\TaskPriority; use Tamer\Classes\Validate; @@ -11,7 +12,7 @@ /** * @var string */ - private string $id; + private $id; /** * @var string @@ -19,9 +20,9 @@ private string $function_name; /** - * @var string + * @var string|Closure|null */ - private string $data; + private string|null|Closure $data; /** * @var int @@ -42,10 +43,10 @@ * Public Constructor * * @param string $function_name - * @param string $data + * @param string|Closure|null $data * @param callable|null $callback */ - public function __construct(string $function_name, string $data, callable $callback=null) + public function __construct(string $function_name, string|Closure|null $data, callable $callback=null) { $this->function_name = $function_name; $this->data = $data; @@ -80,9 +81,9 @@ /** * Returns the arguments for the task * - * @return string + * @return string|Closure|null */ - public function getData(): string + public function getData(): string|null|Closure { return $this->data; } diff --git a/src/Tamer/Protocols/GearmanClient.php b/src/Tamer/Protocols/GearmanClient.php index 37ec3ce..75cc96c 100644 --- a/src/Tamer/Protocols/GearmanClient.php +++ b/src/Tamer/Protocols/GearmanClient.php @@ -4,10 +4,10 @@ namespace Tamer\Protocols; + use Closure; use Exception; use GearmanTask; use LogLib\Log; - use Opis\Closure\SerializableClosure; use Tamer\Abstracts\JobStatus; use Tamer\Abstracts\TaskPriority; use Tamer\Exceptions\ServerException; @@ -145,13 +145,13 @@ /** * Executes a closure in the background * - * @param callable $function + * @param Closure $function * @return void * @throws ServerException */ - public function closure(callable $function): void + public function closure(Closure $function): void { - $closure_task = new Task('tamer_closure', \Opis\Closure\serialize(new SerializableClosure($function))); + $closure_task = new Task('tamer_closure', $function); $closure_task->setClosure(true); $this->doBackground($closure_task); } @@ -230,14 +230,14 @@ /** * Adds a closure task to the list of tasks to run * - * @param callable $function + * @param Closure $function * @param $callback * @return void * @throws ServerException */ - public function addClosureTask(callable $function, $callback): void + public function addClosureTask(Closure $function, $callback): void { - $closure_task = new Task('tamer_closure', \Opis\Closure\serialize(new SerializableClosure($function)), $callback); + $closure_task = new Task('tamer_closure', $function, $callback); $closure_task->setClosure(true); $this->addTask($closure_task); } diff --git a/src/Tamer/Protocols/GearmanWorker.php b/src/Tamer/Protocols/GearmanWorker.php index b5a4802..17acdaa 100644 --- a/src/Tamer/Protocols/GearmanWorker.php +++ b/src/Tamer/Protocols/GearmanWorker.php @@ -192,11 +192,12 @@ { /** @var SerializableClosure $closure */ $closure = $received_job->getData(); - $result = $closure->getClosure()->__invoke($received_job); + $result = $closure($received_job); } catch(Exception $e) { $job->sendFail(); + unset($e); return; } diff --git a/src/Tamer/Protocols/RabbitMqClient.php b/src/Tamer/Protocols/RabbitMqClient.php index 4072ed0..5ff4ecd 100644 --- a/src/Tamer/Protocols/RabbitMqClient.php +++ b/src/Tamer/Protocols/RabbitMqClient.php @@ -2,7 +2,96 @@ namespace Tamer\Protocols; - class RabbitMqClient - { + use Tamer\Exceptions\ServerException; + use Tamer\Interfaces\ClientProtocolInterface; + use Tamer\Objects\Task; + class RabbitMqClient implements ClientProtocolInterface + { + /** + * @var \R|null $client + */ + private $client; + + /** + * @var array + */ + private $server_cache; + + /** + * Used for tracking the current execution of tasks and run callbacks on completion + * + * @var Task[] + */ + private $tasks; + + /** + * @var bool + */ + private $automatic_reconnect; + + /** + * @var int + */ + private $next_reconnect; + + /** + */ + public function __construct() + { + $this->client = null; + $this->tasks = []; + $this->automatic_reconnect = false; + $this->next_reconnect = time() + 1800; + $this->server_cache = []; + + try + { + $this->reconnect(); + } + catch(ServerException $e) + { + unset($e); + } + } + + public function addOptions(array $options): bool + { + // TODO: Implement addOptions() method. + } + + public function addServer(string $host, int $port): bool + { + // TODO: Implement addServer() method. + } + + public function addServers(array $servers): bool + { + // TODO: Implement addServers() method. + } + + public function doBackground(Task $task): void + { + // TODO: Implement doBackground() method. + } + + public function addTask(Task $task): void + { + // TODO: Implement addTask() method. + } + + public function run(): bool + { + // TODO: Implement run() method. + } + + public function isAutomaticReconnect(): bool + { + // TODO: Implement isAutomaticReconnect() method. + } + + public function setAutomaticReconnect(bool $automatic_reconnect): void + { + // TODO: Implement setAutomaticReconnect() method. + } } \ No newline at end of file