From 6d8d4a75a45756ed99ac0772a06cb169402cab0a Mon Sep 17 00:00:00 2001 From: Netkas Date: Thu, 2 Feb 2023 21:09:45 -0500 Subject: [PATCH] Implemented RabbitMQ and Refactored Client & Worker (WIP) --- project.json | 6 + .../Interfaces/ClientProtocolInterface.php | 70 ++- .../Interfaces/WorkerProtocolInterface.php | 82 +++- .../{GearmanClient.php => Gearman/Client.php} | 29 +- .../{GearmanWorker.php => Gearman/Worker.php} | 48 ++- src/Tamer/Protocols/RabbitMq/Client.php | 401 ++++++++++++++++++ src/Tamer/Protocols/RabbitMq/Worker.php | 324 ++++++++++++++ src/Tamer/Protocols/RabbitMqClient.php | 97 ----- tests/gearman_client.php | 14 +- tests/gearman_closure.php | 14 +- tests/gearman_worker.php | 6 +- tests/rabbitmq_client.php | 18 + tests/rabbitmq_worker.php | 27 ++ 13 files changed, 966 insertions(+), 170 deletions(-) rename src/Tamer/Protocols/{GearmanClient.php => Gearman/Client.php} (92%) rename src/Tamer/Protocols/{GearmanWorker.php => Gearman/Worker.php} (88%) create mode 100644 src/Tamer/Protocols/RabbitMq/Client.php create mode 100644 src/Tamer/Protocols/RabbitMq/Worker.php delete mode 100644 src/Tamer/Protocols/RabbitMqClient.php create mode 100644 tests/rabbitmq_client.php create mode 100644 tests/rabbitmq_worker.php diff --git a/project.json b/project.json index 26ada6b..d90d02c 100644 --- a/project.json +++ b/project.json @@ -46,6 +46,12 @@ "version": "latest", "source_type": "remote", "source": "opis/closure=latest@composer" + }, + { + "name": "com.php_amqplib.php_amqplib", + "version": "latest", + "source_type": "remote", + "source": "php-amqplib/php-amqplib=latest@composer" } ], "configurations": [ diff --git a/src/Tamer/Interfaces/ClientProtocolInterface.php b/src/Tamer/Interfaces/ClientProtocolInterface.php index e03baff..56188e7 100644 --- a/src/Tamer/Interfaces/ClientProtocolInterface.php +++ b/src/Tamer/Interfaces/ClientProtocolInterface.php @@ -2,17 +2,18 @@ namespace Tamer\Interfaces; + use Closure; use Tamer\Objects\Task; interface ClientProtocolInterface { /** - * Adds options to the client (client specific) + * Public Constructor with optional username and password * - * @param array $options - * @return bool + * @param string|null $username (optional) The username to use when connecting to the server (if required) + * @param string|null $password (optional) The password to use when connecting to the server */ - public function addOptions(array $options): bool; + public function __construct(?string $username=null, ?string $password=null); /** * Adds a server to the list of servers to use @@ -32,27 +33,12 @@ public function addServers(array $servers): bool; /** - * Processes a task in the background (does not return a result) - * - * @param Task $task The task to process - * @return void - */ - public function doBackground(Task $task): void; - - /** - * Queues a task to be processed in parallel (returns a result handled by a callback) - * - * @param Task $task - * @return void - */ - public function addTask(Task $task): void; - - /** - * Executes all tasks in the queue and waits for them to complete + * Adds options to the client (client specific) * + * @param array $options * @return bool */ - public function run(): bool; + public function addOptions(array $options): bool; /** * Returns True if the client is set to automatically reconnect to the server after a period of time @@ -68,4 +54,44 @@ * @return void */ public function setAutomaticReconnect(bool $automatic_reconnect): void; + + /** + * Processes a task in the background (does not return a result) + * + * @param Task $task The task to process + * @return void + */ + public function do(Task $task): void; + + /** + * Executes a closure operation in the background (does not return a result) + * + * @param Closure $closure The closure operation to perform (remote) + * @return void + */ + public function doClosure(Closure $closure): void; + + /** + * Queues a task to be processed in parallel (returns a result handled by a callback) + * + * @param Task $task + * @return void + */ + public function queue(Task $task): void; + + /** + * Queues a closure to be processed in parallel (returns a result handled by a callback) + * + * @param Closure $closure The closure operation to perform (remote) + * @param Closure $callback The closure to call when the operation is complete (local) + * @return void + */ + public function queueClosure(Closure $closure, Closure $callback): void; + + /** + * Executes all tasks in the queue and waits for them to complete + * + * @return bool + */ + public function run(): bool; } \ No newline at end of file diff --git a/src/Tamer/Interfaces/WorkerProtocolInterface.php b/src/Tamer/Interfaces/WorkerProtocolInterface.php index 3107e1b..8d56b6d 100644 --- a/src/Tamer/Interfaces/WorkerProtocolInterface.php +++ b/src/Tamer/Interfaces/WorkerProtocolInterface.php @@ -1,8 +1,82 @@ client = null; $this->tasks = []; @@ -145,15 +148,15 @@ /** * Executes a closure in the background * - * @param Closure $function + * @param Closure $closure * @return void * @throws ServerException */ - public function closure(Closure $function): void + public function doClosure(Closure $closure): void { - $closure_task = new Task('tamer_closure', $function); + $closure_task = new Task('tamer_closure', $closure); $closure_task->setClosure(true); - $this->doBackground($closure_task); + $this->do($closure_task); } /** @@ -163,7 +166,7 @@ * @return void * @throws ServerException */ - public function doBackground(Task $task): void + public function do(Task $task): void { if($this->automatic_reconnect && time() > $this->next_reconnect) { @@ -199,7 +202,7 @@ * @return void * @throws ServerException */ - public function addTask(Task $task): void + public function queue(Task $task): void { if($this->automatic_reconnect && time() > $this->next_reconnect) { @@ -230,16 +233,16 @@ /** * Adds a closure task to the list of tasks to run * - * @param Closure $function + * @param Closure $closure * @param $callback * @return void * @throws ServerException */ - public function addClosureTask(Closure $function, $callback): void + public function queueClosure(Closure $closure, $callback): void { - $closure_task = new Task('tamer_closure', $function, $callback); + $closure_task = new Task('tamer_closure', $closure, $callback); $closure_task->setClosure(true); - $this->addTask($closure_task); + $this->queue($closure_task); } /** diff --git a/src/Tamer/Protocols/GearmanWorker.php b/src/Tamer/Protocols/Gearman/Worker.php similarity index 88% rename from src/Tamer/Protocols/GearmanWorker.php rename to src/Tamer/Protocols/Gearman/Worker.php index 17acdaa..a08b1b6 100644 --- a/src/Tamer/Protocols/GearmanWorker.php +++ b/src/Tamer/Protocols/Gearman/Worker.php @@ -2,7 +2,7 @@ /** @noinspection PhpMissingFieldTypeInspection */ - namespace Tamer\Protocols; + namespace Tamer\Protocols\Gearman; use Exception; use GearmanJob; @@ -14,7 +14,7 @@ use Tamer\Objects\Job; use Tamer\Objects\JobResults; - class GearmanWorker implements WorkerProtocolInterface + class Worker implements WorkerProtocolInterface { /** * @var \GearmanWorker|null @@ -36,7 +36,7 @@ */ private $next_reconnect; - public function __construct() + public function __construct(?string $username=null, ?string $password=null) { $this->worker = null; $this->server_cache = []; @@ -53,6 +53,25 @@ } } + /** + * @param array $options + * @return bool + * @inheritDoc + */ + public function addOptions(array $options): bool + { + if($this->worker == null) + { + return false; + } + + $options = array_map(function($option) + { + return constant($option); + }, $options); + + return $this->worker->addOptions(array_sum($options)); + } /** * Adds a server to the list of servers to use * @@ -91,18 +110,16 @@ * * @link http://php.net/manual/en/gearmanworker.addservers.php * @param string[] $servers (host:port, host:port, ...) - * @return WorkerProtocolInterface + * @return void * @throws ServerException */ - public function addServers(array $servers): self + public function addServers(array $servers): void { foreach($servers as $server) { $server = explode(':', $server); $this->addServer($server[0], $server[1]); } - - return $this; } @@ -113,9 +130,9 @@ * @param string $function_name The name of the function to register with the job server * @param callable $function The callback function to call when the job is received * @param mixed|null $context (optional) The context to pass to the callback function - * @return WorkerProtocolInterface + * @return void */ - public function addFunction(string $function_name, callable $function, mixed $context=null): self + public function addFunction(string $function_name, callable $function, mixed $context=null): void { $this->worker->addFunction($function_name, function(GearmanJob $job) use ($function, $context) { @@ -135,19 +152,17 @@ $job->sendComplete(msgpack_pack($job_results->toArray())); }); - return $this; } /** * Removes a function from the list of functions to call * * @param string $function_name The name of the function to unregister - * @return WorkerProtocolInterface + * @return void */ - public function removeFunction(string $function_name): self + public function removeFunction(string $function_name): void { $this->worker->unregister($function_name); - return $this; } /** @@ -160,12 +175,11 @@ /** * @param bool $automatic_reconnect - * @return WorkerProtocolInterface + * @return void */ - public function setAutomaticReconnect(bool $automatic_reconnect): self + public function setAutomaticReconnect(bool $automatic_reconnect): void { $this->automatic_reconnect = $automatic_reconnect; - return $this; } /** @@ -211,7 +225,7 @@ * * @link http://php.net/manual/en/gearmanworker.work.php * @param bool $blocking (default: true) Whether to block until a job is received - * @param int $timeout (default: 500) The timeout in milliseconds + * @param int $timeout (default: 500) The timeout in milliseconds (if $blocking is false) * @param bool $throw_errors (default: false) Whether to throw exceptions on errors * @return void Returns nothing * @throws ServerException If the worker cannot connect to the server diff --git a/src/Tamer/Protocols/RabbitMq/Client.php b/src/Tamer/Protocols/RabbitMq/Client.php new file mode 100644 index 0000000..2fca629 --- /dev/null +++ b/src/Tamer/Protocols/RabbitMq/Client.php @@ -0,0 +1,401 @@ +tasks = []; + $this->automatic_reconnect = false; + $this->next_reconnect = time() + 1800; + $this->server_cache = []; + $this->options = []; + $this->connection = null; + $this->username = $username; + $this->password = $password; + + try + { + $this->reconnect(); + } + catch(ServerException $e) + { + unset($e); + } + } + + public function addOptions(array $options): bool + { + $this->options = $options; + return true; + } + + public function addServer(string $host, int $port): bool + { + if(!isset($this->server_cache[$host])) + { + $this->server_cache[$host] = []; + } + + if(in_array($port, $this->server_cache[$host])) + { + return true; + } + + $this->server_cache[$host][] = $port; + $this->reconnect(); + + return true; + } + + /** + * Adds a list of servers to the list of servers to use + * + * @param array $servers (host:port, host:port, ...) + * @return bool + */ + public function addServers(array $servers): bool + { + foreach($servers as $server) + { + $server = explode(':', $server); + $this->addServer($server[0], $server[1]); + } + + return true; + } + + /** + * Calculates the priority for a task based on the priority level + * + * @param int $priority + * @return int + */ + private static function calculatePriority(int $priority): int + { + if($priority < TaskPriority::Low) + return 0; + + if($priority > TaskPriority::High) + return 255; + + return (int) round(($priority / TaskPriority::High) * 255); + } + + /** + * @param Task $task + * @return void + */ + public function do(Task $task): void + { + $job = new Job($task); + + $message = new AMQPMessage(msgpack_pack($job->toArray()), [ + 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, + 'priority' => self::calculatePriority($task->getPriority()), + ]); + + $this->channel->basic_publish($message, '', 'tamer_queue'); + } + + /** + * Executes a closure in the background + * + * @param Closure $closure + * @return void + */ + public function doClosure(Closure $closure): void + { + $closure_task = new Task('tamer_closure', $closure); + $closure_task->setClosure(true); + $this->do($closure_task); + } + + /** + * Queues a task to be executed + * + * @param Task $task + * @return void + */ + public function queue(Task $task): void + { + $this->tasks[] = $task; + } + + /** + * Adds a closure task to the list of tasks to run + * + * @param Closure $closure + * @param $callback + * @return void + */ + public function queueClosure(Closure $closure, $callback): void + { + $closure_task = new Task('tamer_closure', $closure, $callback); + $closure_task->setClosure(true); + $this->queue($closure_task); + } + + /** + * Executes all the tasks that has been added + * + * @return bool + */ + public function run(): bool + { + if(count($this->tasks) === 0) + return false; + + $correlationIds = []; + + /** @var Task $task */ + foreach($this->tasks as $task) + { + $correlationIds[] = $task->getId(); + + $job = new Job($task); + + $message = new AMQPMessage(msgpack_pack($job->toArray()), [ + 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, + 'correlation_id' => $task->getId(), + 'reply_to' => 'tamer_queue', + 'priority' => self::calculatePriority($task->getPriority()), + ]); + + $this->channel->basic_publish($message, '', 'tamer_queue'); + } + + // Register callback for each task + $callback = function($msg) use (&$correlationIds) + { + $job_result = JobResults::fromArray(msgpack_unpack($msg->body)); + $task = $this->getTaskById($job_result->getId()); + + try + { + $task->runCallback($job_result); + } + catch(Exception $e) + { + echo $e->getMessage(); + } + + // Remove the processed correlation_id + $index = array_search($msg->get('correlation_id'), $correlationIds); + if ($index !== false) { + unset($correlationIds[$index]); + } + + $this->channel->basic_ack($msg->delivery_info['delivery_tag']); + + // Stop consuming when all tasks are processed + if(count($correlationIds) === 0) + { + $this->channel->basic_cancel($msg->delivery_info['consumer_tag']); + } + }; + + $this->channel->basic_consume('tamer_queue', '', false, false, false, false, $callback); + + // Start consuming messages + while(count($this->channel->callbacks)) + { + $this->channel->wait(); + } + + return true; + } + + /** + * @param string $id + * @return Task|null + */ + private function getTaskById(string $id): ?Task + { + foreach($this->tasks as $task) + { + if($task->getId() === $id) + { + return $task; + } + } + + return null; + } + + /** + * Returns True if the client is automatically reconnecting to the server + * + * @return bool + */ + public function isAutomaticReconnect(): bool + { + return $this->automatic_reconnect; + } + + /** + * Enables or disables automatic reconnecting to the server + * + * @param bool $automatic_reconnect + * @return void + */ + public function setAutomaticReconnect(bool $automatic_reconnect): void + { + $this->automatic_reconnect = $automatic_reconnect; + } + + private function reconnect() + { + $connections = []; + + if(count($this->server_cache) === 0) + return; + + foreach($this->server_cache as $host => $ports) + { + foreach($ports as $port) + { + $host = [ + 'host' => $host, + 'port' => $port + ]; + + if($this->username !== null) + $host['username'] = $this->username; + + if($this->password !== null) + $host['password'] = $this->password; + + $connections[] = $host; + } + } + + // Can only connect to one server for now, so we'll just use the first one + $selected_connection = $connections[0]; + $this->disconnect(); + $this->connection = new AMQPStreamConnection( + $selected_connection['host'], + $selected_connection['port'], + $selected_connection['username'] ?? null, + $selected_connection['password'] ?? null + ); + + $this->channel = $this->connection->channel(); + $this->channel->queue_declare('tamer_queue', false, true, false, false); + } + + /** + * Disconnects from the server + * + * @return void + */ + private function disconnect() + { + try + { + if(!is_null($this->channel)) + { + $this->channel->close(); + } + } + catch(Exception $e) + { + unset($e); + } + finally + { + $this->channel = null; + } + + try + { + if(!is_null($this->connection)) + { + $this->connection->close(); + } + } + catch(Exception $e) + { + unset($e); + } + finally + { + $this->connection = null; + } + } + + /** + * Disconnects from the server when the object is destroyed + */ + public function __destruct() + { + $this->disconnect(); + } + + } \ No newline at end of file diff --git a/src/Tamer/Protocols/RabbitMq/Worker.php b/src/Tamer/Protocols/RabbitMq/Worker.php new file mode 100644 index 0000000..071ae55 --- /dev/null +++ b/src/Tamer/Protocols/RabbitMq/Worker.php @@ -0,0 +1,324 @@ +server_cache = []; + $this->functions = []; + $this->automatic_reconnect = false; + $this->next_reconnect = time() + 1800; + $this->username = $username; + $this->password = $password; + + try + { + $this->reconnect(); + } + catch(Exception $e) + { + unset($e); + } + } + + /** + * @inheritDoc + */ + public function addServer(string $host, int $port): bool + { + if(!isset($this->server_cache[$host])) + { + $this->server_cache[$host] = []; + } + + if(in_array($port, $this->server_cache[$host])) + { + return true; + } + + $this->server_cache[$host][] = $port; + $this->reconnect(); + + return true; + } + + /** + * @inheritDoc + */ + public function addServers(array $servers): void + { + foreach($servers as $server) + { + $server = explode(':', $server); + $this->addServer($server[0], $server[1]); + } + } + + /** + * @inheritDoc + */ + public function addOptions(array $options): bool + { + $this->options = $options; + return true; + } + + /** + * @inheritDoc + */ + public function isAutomaticReconnect(): bool + { + return $this->automatic_reconnect; + } + + /** + * @inheritDoc + */ + public function setAutomaticReconnect(bool $automatic_reconnect): void + { + $this->automatic_reconnect = $automatic_reconnect; + } + + /** + * @inheritDoc + */ + public function addFunction(string $function_name, callable $function, mixed $context = null): void + { + $this->functions[$function_name] = [ + 'function' => $function, + 'context' => $context + ]; + } + + /** + * @inheritDoc + */ + public function removeFunction(string $function_name): void + { + unset($this->functions[$function_name]); + } + + /** + * @inheritDoc + */ + public function work(bool $blocking = true, int $timeout = 500, bool $throw_errors = false): void + { + $callback = function($message) use ($throw_errors) + { + var_dump($message->body); + $job = Job::fromArray(msgpack_unpack($message->body)); + + $job_results = new JobResults($job, JobStatus::Success, 'Hello from worker!'); + + try + { + // Return $job_results + $this->channel->basic_publish( + new AMQPMessage( + msgpack_pack($job_results->toArray()), + [ + 'correlation_id' => $job->getId() + ] + ) + ); + + $this->channel->basic_ack($message->delivery_info['delivery_tag']); + } + catch (Exception $e) + { + if ($throw_errors) + { + throw $e; + } + + $job_results = new JobResults($job, JobStatus::Exception, $e->getMessage()); + + // Return $job_results + $this->channel->basic_publish( + new AMQPMessage( + msgpack_pack($job_results->toArray()), + [ + 'correlation_id' => $job->getId() + ] + ) + ); + + $this->channel->basic_ack($message->delivery_info['delivery_tag']); + } + }; + + $this->channel->basic_consume('tamer_queue', '', false, false, false, false, $callback); + + if ($blocking) + { + while(true) + { + $this->channel->wait(); + } + } + else + { + $start = microtime(true); + while (true) + { + if (microtime(true) - $start >= $timeout / 1000) + { + break; + } + + $this->channel->wait(); + } + } + } + + private function reconnect() + { + $connections = []; + + if(count($this->server_cache) === 0) + return; + + foreach($this->server_cache as $host => $ports) + { + foreach($ports as $port) + { + $host = [ + 'host' => $host, + 'port' => $port + ]; + + if($this->username !== null) + $host['username'] = $this->username; + + if($this->password !== null) + $host['password'] = $this->password; + + $connections[] = $host; + } + } + + // Can only connect to one server for now, so we'll just use the first one + $selected_connection = $connections[0]; + $this->disconnect(); + $this->connection = new AMQPStreamConnection( + $selected_connection['host'], + $selected_connection['port'], + $selected_connection['username'] ?? null, + $selected_connection['password'] ?? null + ); + + $this->channel = $this->connection->channel(); + $this->channel->queue_declare('tamer_queue', false, true, false, false); + } + + /** + * Disconnects from the server + * + * @return void + */ + private function disconnect() + { + try + { + if(!is_null($this->channel)) + { + $this->channel->close(); + } + } + catch(Exception $e) + { + unset($e); + } + finally + { + $this->channel = null; + } + + try + { + if(!is_null($this->connection)) + { + $this->connection->close(); + } + } + catch(Exception $e) + { + unset($e); + } + finally + { + $this->connection = null; + } + } + + /** + * Disconnects from the server when the object is destroyed + */ + public function __destruct() + { + $this->disconnect(); + } + + } \ No newline at end of file diff --git a/src/Tamer/Protocols/RabbitMqClient.php b/src/Tamer/Protocols/RabbitMqClient.php deleted file mode 100644 index 5ff4ecd..0000000 --- a/src/Tamer/Protocols/RabbitMqClient.php +++ /dev/null @@ -1,97 +0,0 @@ -client = null; - $this->tasks = []; - $this->automatic_reconnect = false; - $this->next_reconnect = time() + 1800; - $this->server_cache = []; - - try - { - $this->reconnect(); - } - catch(ServerException $e) - { - unset($e); - } - } - - public function addOptions(array $options): bool - { - // TODO: Implement addOptions() method. - } - - public function addServer(string $host, int $port): bool - { - // TODO: Implement addServer() method. - } - - public function addServers(array $servers): bool - { - // TODO: Implement addServers() method. - } - - public function doBackground(Task $task): void - { - // TODO: Implement doBackground() method. - } - - public function addTask(Task $task): void - { - // TODO: Implement addTask() method. - } - - public function run(): bool - { - // TODO: Implement run() method. - } - - public function isAutomaticReconnect(): bool - { - // TODO: Implement isAutomaticReconnect() method. - } - - public function setAutomaticReconnect(bool $automatic_reconnect): void - { - // TODO: Implement setAutomaticReconnect() method. - } - } \ No newline at end of file diff --git a/tests/gearman_client.php b/tests/gearman_client.php index 7305c05..8fdee96 100644 --- a/tests/gearman_client.php +++ b/tests/gearman_client.php @@ -2,23 +2,23 @@ require 'ncc'; - use Tamer\Objects\JobResults; - use Tamer\Objects\Task; +use Tamer\Objects\JobResults; +use Tamer\Objects\Task; - import('net.nosial.tamerlib', 'latest'); +import('net.nosial.tamerlib', 'latest'); - $client = new \Tamer\Protocols\GearmanClient(); + $client = new \Tamer\Protocols\Gearman\Client(); $client->addServer(); - $client->doBackground(new Task('sleep', '5')); + $client->do(new Task('sleep', '5')); - $client->addTask(new Task('sleep', '5', function(JobResults $job) { + $client->queue(new Task('sleep', '5', function(JobResults $job) { echo "Task {$job->getId()} completed with data: {$job->getData()} \n"; })); - $client->addTask(new Task('sleep', '5', function(JobResults $job) { + $client->queue(new Task('sleep', '5', function(JobResults $job) { echo "Task {$job->getId()} completed with data: {$job->getData()} \n"; })); diff --git a/tests/gearman_closure.php b/tests/gearman_closure.php index 4aa3acd..d0a72eb 100644 --- a/tests/gearman_closure.php +++ b/tests/gearman_closure.php @@ -2,14 +2,14 @@ require 'ncc'; - use Tamer\Objects\JobResults; - use Tamer\Objects\Task; +import('net.nosial.tamerlib', 'latest'); - import('net.nosial.tamerlib', 'latest'); - - $client = new \Tamer\Protocols\GearmanClient(); + $client = new \Tamer\Protocols\Gearman\Client(); $client->addServer(); - $client->closure(function () { - echo "This function was sent from a client, it should be executed on the worker"; + $client->doClosure(function () { + require 'ncc'; + import('net.nosial.loglib', 'latest'); + + \LogLib\Log::info('gearman_closure.php', 'closure'); }); \ No newline at end of file diff --git a/tests/gearman_worker.php b/tests/gearman_worker.php index 5a48584..e998220 100644 --- a/tests/gearman_worker.php +++ b/tests/gearman_worker.php @@ -2,10 +2,10 @@ require 'ncc'; - use Tamer\Objects\Job; +use Tamer\Objects\Job; - import('net.nosial.tamerlib', 'latest'); - $worker = new \Tamer\Protocols\GearmanWorker(); +import('net.nosial.tamerlib', 'latest'); + $worker = new \Tamer\Protocols\Gearman\Worker(); $worker->addServer(); $worker->addFunction('sleep', function($job) { diff --git a/tests/rabbitmq_client.php b/tests/rabbitmq_client.php new file mode 100644 index 0000000..cab644d --- /dev/null +++ b/tests/rabbitmq_client.php @@ -0,0 +1,18 @@ +addServer('127.0.0.1', 5672); + + // Loop through 10 tasks + + for($i = 0; $i < 500; $i++) + { + $client->do(new Task('sleep', '5')); + } diff --git a/tests/rabbitmq_worker.php b/tests/rabbitmq_worker.php new file mode 100644 index 0000000..5d482c3 --- /dev/null +++ b/tests/rabbitmq_worker.php @@ -0,0 +1,27 @@ +addServer('127.0.0.1', 5672); + + $worker->addFunction('sleep', function($job) { + /** @var Job $job */ + var_dump(get_class($job)); + echo "Task {$job->getId()} started with data: {$job->getData()} \n"; + sleep($job->getData()); + echo "Task {$job->getId()} completed with data: {$job->getData()} \n"; + + return $job->getData(); + }); + + + + while(true) + { + echo "Waiting for job... \n"; + $worker->work(); + } \ No newline at end of file