From f88f453578073a3921ce7496ecff4e6e79caf61c Mon Sep 17 00:00:00 2001 From: Netkas Date: Wed, 1 Feb 2023 23:42:41 -0500 Subject: [PATCH] Progress on closures --- .idea/php.xml | 6 + .idea/tamer.iml | 1 + project.json | 6 + .../Interfaces/ClientProtocolInterface.php | 63 +++++++ src/Tamer/Objects/Job.php | 66 +++++++- src/Tamer/Objects/JobResults.php | 128 +++++++++++---- src/Tamer/Objects/Task.php | 22 +++ src/Tamer/Protocols/GearmanClient.php | 154 +++++++++--------- src/Tamer/Protocols/GearmanWorker.php | 50 ++++-- tests/client_example.php | 18 -- tests/gearman_client.php | 26 +++ tests/gearman_closure.php | 15 ++ tests/gearman_worker.php | 16 +- 13 files changed, 427 insertions(+), 144 deletions(-) delete mode 100644 tests/client_example.php create mode 100644 tests/gearman_client.php create mode 100644 tests/gearman_closure.php diff --git a/.idea/php.xml b/.idea/php.xml index e01b5fd..2c4b796 100644 --- a/.idea/php.xml +++ b/.idea/php.xml @@ -15,9 +15,15 @@ + + + + + + diff --git a/.idea/tamer.iml b/.idea/tamer.iml index 4348106..24c8dff 100644 --- a/.idea/tamer.iml +++ b/.idea/tamer.iml @@ -4,6 +4,7 @@ + diff --git a/project.json b/project.json index e57e387..26ada6b 100644 --- a/project.json +++ b/project.json @@ -40,6 +40,12 @@ "version": "latest", "source_type": "remote", "source": "nosial/libs.log=latest@n64" + }, + { + "name": "com.opis.closure", + "version": "latest", + "source_type": "remote", + "source": "opis/closure=latest@composer" } ], "configurations": [ diff --git a/src/Tamer/Interfaces/ClientProtocolInterface.php b/src/Tamer/Interfaces/ClientProtocolInterface.php index d0e2956..e03baff 100644 --- a/src/Tamer/Interfaces/ClientProtocolInterface.php +++ b/src/Tamer/Interfaces/ClientProtocolInterface.php @@ -2,7 +2,70 @@ namespace Tamer\Interfaces; + use Tamer\Objects\Task; + interface ClientProtocolInterface { + /** + * Adds options to the client (client specific) + * + * @param array $options + * @return bool + */ + public function addOptions(array $options): bool; + /** + * Adds a server to the list of servers to use + * + * @param string $host The host to connect to (eg; 127.0.0.1) + * @param int $port The port to connect to (eg; 4730) + * @return bool + */ + public function addServer(string $host, int $port): bool; + + /** + * Adds a list of servers to the list of servers to use + * + * @param array $servers An array of servers to connect to (eg; ['host:port', 'host:port', ...]) + * @return bool + */ + public function addServers(array $servers): bool; + + /** + * Processes a task in the background (does not return a result) + * + * @param Task $task The task to process + * @return void + */ + public function doBackground(Task $task): void; + + /** + * Queues a task to be processed in parallel (returns a result handled by a callback) + * + * @param Task $task + * @return void + */ + public function addTask(Task $task): void; + + /** + * Executes all tasks in the queue and waits for them to complete + * + * @return bool + */ + public function run(): bool; + + /** + * Returns True if the client is set to automatically reconnect to the server after a period of time + * + * @return bool + */ + public function isAutomaticReconnect(): bool; + + /** + * Enables or disables automatic reconnecting to the server after a period of time + * + * @param bool $automatic_reconnect + * @return void + */ + public function setAutomaticReconnect(bool $automatic_reconnect): void; } \ No newline at end of file diff --git a/src/Tamer/Objects/Job.php b/src/Tamer/Objects/Job.php index 36b9de0..8053815 100644 --- a/src/Tamer/Objects/Job.php +++ b/src/Tamer/Objects/Job.php @@ -27,14 +27,24 @@ */ private $data; - public function __construct(string $id, string $name, string $data) + /** + * Indicates if the data is a closure + * + * @var bool + */ + private $closure; + + public function __construct(Task $task) { - $this->id = $id; - $this->name = $name; - $this->data = $data; + $this->id = $task->getId(); + $this->name = $task->getFunctionName(); + $this->data = $task->getData(); + $this->closure = $task->isClosure(); } /** + * Returns the ID of the Job + * * @return string */ public function getId(): string @@ -43,6 +53,8 @@ } /** + * Returns the function name of the Job + * * @return string */ public function getName(): string @@ -51,10 +63,56 @@ } /** + * Returns the data of the Job + * * @return string */ public function getData(): string { return $this->data; } + + /** + * @return bool + */ + public function isClosure(): bool + { + return $this->closure; + } + + /** + * Returns an array representation of the Job + * + * @return array + */ + public function toArray(): array + { + return [ + 'id' => $this->id, + 'name' => $this->name, + 'data' => ($this->closure ? \Opis\Closure\serialize($this->data) : $this->data), + 'closure' => $this->closure + ]; + } + + /** + * Constructs a Job from an array + * + * @param array $data + * @return Job + */ + public static function fromArray(array $data): Job + { + $data = $data['data']; + + if($data['closure'] === true) + $data = \Opis\Closure\unserialize($data['data']); + + $job = new Job(new Task($data['name'], $data['data'])); + $job->id = $data['id']; + $job->closure = $data['closure']; + + return $job; + } + } \ No newline at end of file diff --git a/src/Tamer/Objects/JobResults.php b/src/Tamer/Objects/JobResults.php index 09d8f59..2f9049d 100644 --- a/src/Tamer/Objects/JobResults.php +++ b/src/Tamer/Objects/JobResults.php @@ -1,48 +1,69 @@ task = $task; - $this->status = $status; - $this->result = $result; + if($job !== null) + { + $this->id = $job->getId(); + $this->data = $results; + $this->status = $status; + } } /** - * @return Task + * Returns the ID of the Job + * + * @return string */ - public function getTask(): Task + public function getId(): string { - return $this->task; + return $this->id; + } + + + /** + * Returns the data of the Job + * + * @return string + */ + public function getData(): string + { + return $this->data; } /** * @return int + * @noinspection PhpUnused */ public function getStatus(): int { @@ -50,10 +71,59 @@ } /** - * @return string|null + * Returns an array representation of the Job + * + * @return array */ - public function getResult(): ?string + public function toArray(): array { - return $this->result; + return [ + 'id' => $this->id, + 'data' => $this->data, + 'status' => $this->status + ]; } + + /** + * Constructs a Job from an array + * + * @param array $data + * @return JobResults + */ + public static function fromArray(array $data): JobResults + { + $job = new JobResults(); + + $job->setId($data['id']); + $job->setData($data['data']); + $job->setStatus($data['status']); + + return $job; + } + + /** + * @param string $id + */ + protected function setId(string $id): void + { + $this->id = $id; + } + + /** + * @param string $data + */ + protected function setData(string $data): void + { + $this->data = $data; + } + + /** + * @param int|null $status + */ + protected function setStatus(?int $status): void + { + $this->status = $status; + } + + } \ No newline at end of file diff --git a/src/Tamer/Objects/Task.php b/src/Tamer/Objects/Task.php index 4cfcbf5..54152ce 100644 --- a/src/Tamer/Objects/Task.php +++ b/src/Tamer/Objects/Task.php @@ -33,6 +33,11 @@ */ private $callback; + /** + * @var bool + */ + private $closure; + /** * Public Constructor * @@ -47,6 +52,7 @@ $this->id = uniqid(); $this->priority = TaskPriority::Normal; $this->callback = $callback; + $this->closure = false; } /** @@ -147,4 +153,20 @@ call_user_func($this->callback, $result); } } + + /** + * @return bool + */ + public function isClosure(): bool + { + return $this->closure; + } + + /** + * @param bool $closure + */ + public function setClosure(bool $closure): void + { + $this->closure = $closure; + } } \ No newline at end of file diff --git a/src/Tamer/Protocols/GearmanClient.php b/src/Tamer/Protocols/GearmanClient.php index 1abe10a..37ec3ce 100644 --- a/src/Tamer/Protocols/GearmanClient.php +++ b/src/Tamer/Protocols/GearmanClient.php @@ -7,10 +7,12 @@ use Exception; use GearmanTask; use LogLib\Log; + use Opis\Closure\SerializableClosure; use Tamer\Abstracts\JobStatus; use Tamer\Abstracts\TaskPriority; use Tamer\Exceptions\ServerException; use Tamer\Interfaces\ClientProtocolInterface; + use Tamer\Objects\Job; use Tamer\Objects\JobResults; use Tamer\Objects\Task; @@ -51,6 +53,7 @@ $this->tasks = []; $this->automatic_reconnect = false; $this->next_reconnect = time() + 1800; + $this->server_cache = []; try { @@ -80,6 +83,11 @@ return $this->client->addOptions($options); } + /** + * Registers callbacks for the client + * + * @return void + */ private function registerCallbacks(): void { $this->client->setCompleteCallback([$this, 'callbackHandler']); @@ -134,14 +142,28 @@ return $this->client->addServers(implode(',', $servers)); } + /** + * Executes a closure in the background + * + * @param callable $function + * @return void + * @throws ServerException + */ + public function closure(callable $function): void + { + $closure_task = new Task('tamer_closure', \Opis\Closure\serialize(new SerializableClosure($function))); + $closure_task->setClosure(true); + $this->doBackground($closure_task); + } + /** * Processes a task in the background * * @param Task $task - * @return bool + * @return void * @throws ServerException */ - public function doBackground(Task $task): bool + public function doBackground(Task $task): void { if($this->automatic_reconnect && time() > $this->next_reconnect) { @@ -150,29 +172,34 @@ } $this->tasks[] = $task; + $job = new Job($task); switch($task->getPriority()) { case TaskPriority::High: - return $this->client->doHighBackground($task->getFunctionName(), $task->getData(), $task->getId()); + $this->client->doHighBackground($task->getFunctionName(), msgpack_pack($job->toArray())); + break; case TaskPriority::Low: - return $this->client->doLowBackground($task->getFunctionName(), $task->getData(), $task->getId()); + $this->client->doLowBackground($task->getFunctionName(), msgpack_pack($job->toArray())); + break; default: case TaskPriority::Normal: - return $this->client->doBackground($task->getFunctionName(), $task->getData(), $task->getId()); + $this->client->doBackground($task->getFunctionName(), msgpack_pack($job->toArray())); + break; } + } /** - * Processes a task in the foreground + * Adds a task to the list of tasks to run * * @param Task $task - * @return JobResults + * @return void * @throws ServerException */ - public function do(Task $task): JobResults + public function addTask(Task $task): void { if($this->automatic_reconnect && time() > $this->next_reconnect) { @@ -181,85 +208,45 @@ } $this->tasks[] = $task; + $job = new Job($task); switch($task->getPriority()) { case TaskPriority::High: - return new JobResults($task, JobStatus::Success, $this->client->doHigh($task->getFunctionName(), $task->getData(), $task->getId())); + $this->client->addTaskHigh($task->getFunctionName(), msgpack_pack($job->toArray())); + break; case TaskPriority::Low: - return new JobResults($task, JobStatus::Success, $this->client->doLow($task->getFunctionName(), $task->getData(), $task->getId())); + $this->client->addTaskLow($task->getFunctionName(), msgpack_pack($job->toArray())); + break; default: case TaskPriority::Normal: - return new JobResults($task, JobStatus::Success, $this->client->doNormal($task->getFunctionName(), $task->getData(), $task->getId())); + $this->client->addTask($task->getFunctionName(), msgpack_pack($job->toArray())); + break; } } - public function addTask(Task $task): ClientProtocolInterface + /** + * Adds a closure task to the list of tasks to run + * + * @param callable $function + * @param $callback + * @return void + * @throws ServerException + */ + public function addClosureTask(callable $function, $callback): void { - if($this->automatic_reconnect && time() > $this->next_reconnect) - { - $this->reconnect(); - $this->next_reconnect = time() + 1800; - } - - $this->tasks[] = $task; - - switch($task->getPriority()) - { - case TaskPriority::High: - $this->client->addTaskHigh($task->getFunctionName(), $task->getData(), $task->getId()); - break; - - case TaskPriority::Low: - $this->client->addTaskLow($task->getFunctionName(), $task->getData(), $task->getId()); - break; - - default: - case TaskPriority::Normal: - $this->client->addTask($task->getFunctionName(), $task->getData(), $task->getId()); - break; - } - - return $this; - } - - - public function addBackgroundTask(Task $task): ClientProtocolInterface - { - if($this->automatic_reconnect && time() > $this->next_reconnect) - { - $this->reconnect(); - $this->next_reconnect = time() + 1800; - } - - $this->tasks[] = $task; - - switch($task->getPriority()) - { - case TaskPriority::High: - $this->client->addTaskHighBackground($task->getFunctionName(), $task->getData(), $task->getId()); - break; - - case TaskPriority::Low: - $this->client->addTaskLowBackground($task->getFunctionName(), $task->getData(), $task->getId()); - break; - - default: - case TaskPriority::Normal: - $this->client->addTaskBackground($task->getFunctionName(), $task->getData(), $task->getId()); - break; - } - - return $this; + $closure_task = new Task('tamer_closure', \Opis\Closure\serialize(new SerializableClosure($function)), $callback); + $closure_task->setClosure(true); + $this->addTask($closure_task); } /** * @return bool * @throws ServerException */ - public function doTasks(): bool + public function run(): bool { if($this->automatic_reconnect && time() > $this->next_reconnect) { @@ -283,7 +270,8 @@ */ public function callbackHandler(GearmanTask $task): void { - $internal_task = $this->getTaskById($task->unique()); + $job_result = JobResults::fromArray(msgpack_unpack($task->data())); + $internal_task = $this->getTaskById($job_result->getId()); $job_status = match ($task->returnCode()) { GEARMAN_WORK_EXCEPTION => JobStatus::Exception, @@ -291,12 +279,10 @@ default => JobStatus::Success, }; - $job_results = new JobResults($internal_task, $job_status, ($task->data() ?? null)); - try { Log::debug('net.nosial.tamer', 'callback for task ' . $internal_task->getId() . ' with status ' . $job_status . ' and data size ' . strlen($task->data()) . ' bytes'); - $internal_task->runCallback($job_results); + $internal_task->runCallback($job_result); } catch(Exception $e) { @@ -314,8 +300,6 @@ */ private function getTaskById(string $id): ?Task { - var_dump($this->tasks); - var_dump($id); foreach($this->tasks as $task) { if($task->getId() === $id) @@ -349,16 +333,15 @@ * Removes a task from the list of tasks * * @param Task $task - * @return ClientProtocolInterface + * @return void */ - private function removeTask(Task $task): ClientProtocolInterface + private function removeTask(Task $task): void { $this->tasks = array_filter($this->tasks, function($item) use ($task) { return $item->getId() !== $task->getId(); }); - return $this; } /** @@ -376,4 +359,21 @@ { $this->automatic_reconnect = $automatic_reconnect; } + + /** + * Executes all remaining tasks and closes the connection + */ + public function __destruct() + { + try + { + $this->client->runTasks(); + } + catch(Exception $e) + { + unset($e); + } + + unset($this->client); + } } \ No newline at end of file diff --git a/src/Tamer/Protocols/GearmanWorker.php b/src/Tamer/Protocols/GearmanWorker.php index 2e3d359..b5a4802 100644 --- a/src/Tamer/Protocols/GearmanWorker.php +++ b/src/Tamer/Protocols/GearmanWorker.php @@ -1,13 +1,18 @@ worker = null; $this->server_cache = []; $this->automatic_reconnect = false; $this->next_reconnect = time() + 1800; - $this->current_job = null; try { @@ -120,10 +119,21 @@ { $this->worker->addFunction($function_name, function(GearmanJob $job) use ($function, $context) { - $this->current_job = $job; - $job = new Job($job->unique(), $job->handle(), $job->workload()); - $function($job, $context); - $this->current_job = null; + $received_job = Job::fromArray(msgpack_unpack($job->workload())); + + try + { + $result = $function($received_job, $context); + } + catch(Exception $e) + { + $job->sendFail(); + return; + } + + $job_results = new JobResults($received_job, JobStatus::Success, $result); + $job->sendComplete(msgpack_pack($job_results->toArray())); + }); return $this; } @@ -173,6 +183,26 @@ $this->addServer($host, $port); } } + + $this->worker->addFunction('tamer_closure', function(GearmanJob $job) + { + $received_job = Job::fromArray(msgpack_unpack($job->workload())); + + try + { + /** @var SerializableClosure $closure */ + $closure = $received_job->getData(); + $result = $closure->getClosure()->__invoke($received_job); + } + catch(Exception $e) + { + $job->sendFail(); + return; + } + + $job_results = new JobResults($received_job, JobStatus::Success, $result); + $job->sendComplete(msgpack_pack($job_results->toArray())); + }); } /** diff --git a/tests/client_example.php b/tests/client_example.php deleted file mode 100644 index 4a379eb..0000000 --- a/tests/client_example.php +++ /dev/null @@ -1,18 +0,0 @@ -addServer(); - - $client->addTask(new Task('sleep', '5', function(Job $job) { - echo "Task {$job->getId()} completed with data: {$job->getData()} \n"; - })); - - - $client->doTasks(); \ No newline at end of file diff --git a/tests/gearman_client.php b/tests/gearman_client.php new file mode 100644 index 0000000..7305c05 --- /dev/null +++ b/tests/gearman_client.php @@ -0,0 +1,26 @@ +addServer(); + + $client->doBackground(new Task('sleep', '5')); + + + $client->addTask(new Task('sleep', '5', function(JobResults $job) { + echo "Task {$job->getId()} completed with data: {$job->getData()} \n"; + })); + + + $client->addTask(new Task('sleep', '5', function(JobResults $job) { + echo "Task {$job->getId()} completed with data: {$job->getData()} \n"; + })); + + + $client->run(); \ No newline at end of file diff --git a/tests/gearman_closure.php b/tests/gearman_closure.php new file mode 100644 index 0000000..4aa3acd --- /dev/null +++ b/tests/gearman_closure.php @@ -0,0 +1,15 @@ +addServer(); + + $client->closure(function () { + echo "This function was sent from a client, it should be executed on the worker"; + }); \ No newline at end of file diff --git a/tests/gearman_worker.php b/tests/gearman_worker.php index 7783f89..5a48584 100644 --- a/tests/gearman_worker.php +++ b/tests/gearman_worker.php @@ -1,17 +1,21 @@ addServer(); - $worker->addFunction('sleep', function($task) { - var_dump(get_class($task)); - echo "Task {$task->getId()} started with data: {$task->getData()} \n"; - sleep($task->getData()); - echo "Task {$task->getId()} completed with data: {$task->getData()} \n"; + $worker->addFunction('sleep', function($job) { + /** @var Job $job */ + var_dump(get_class($job)); + echo "Task {$job->getId()} started with data: {$job->getData()} \n"; + sleep($job->getData()); + echo "Task {$job->getId()} completed with data: {$job->getData()} \n"; + + return $job->getData(); });