From 9084ad8ca5247d8b74cd3e4b917e711be792be83 Mon Sep 17 00:00:00 2001 From: Netkas Date: Thu, 9 Feb 2023 15:31:44 -0500 Subject: [PATCH] Refactored Worker for RabbitMQ (Needs more work) --- src/TamerLib/Protocols/RabbitMq/Worker.php | 402 ++++++++++++--------- 1 file changed, 233 insertions(+), 169 deletions(-) diff --git a/src/TamerLib/Protocols/RabbitMq/Worker.php b/src/TamerLib/Protocols/RabbitMq/Worker.php index c13da5b..d8f41a8 100644 --- a/src/TamerLib/Protocols/RabbitMq/Worker.php +++ b/src/TamerLib/Protocols/RabbitMq/Worker.php @@ -5,10 +5,10 @@ namespace TamerLib\Protocols\RabbitMq; use Exception; - use PhpAmqpLib\Channel\AMQPChannel; - use PhpAmqpLib\Connection\AMQPStreamConnection; + use LogLib\Log; use PhpAmqpLib\Message\AMQPMessage; use TamerLib\Abstracts\JobStatus; + use TamerLib\Exceptions\ConnectionException; use TamerLib\Interfaces\WorkerProtocolInterface; use TamerLib\Objects\Job; use TamerLib\Objects\JobResults; @@ -16,9 +16,11 @@ class Worker implements WorkerProtocolInterface { /** + * An array of defined servers to use + * * @var array */ - private $server_cache; + private $defined_servers; /** * @var false @@ -26,107 +28,202 @@ private $automatic_reconnect; /** - * @var int - */ - private $next_reconnect; - - /** + * An array of functions that the worker handles + * * @var array */ private $functions; /** + * (Optional) The username to use when connecting to the server (if required) + * * @var string|null */ private $username; /** + * (Optional) The password to use when connecting to the server + * * @var string|null */ private $password; /** - * @var AMQPStreamConnection|null + * A array of active connections + * + * @var Connection[] */ - private $connection; - - /** - * @var AMQPChannel|null - */ - private $channel; + private $connections; /** * @var array */ private $options; - /** - * @inheritDoc + * Public Constructor with optional username and password + * + * @param string|null $username + * @param string|null $password */ public function __construct(?string $username = null, ?string $password = null) { - $this->server_cache = []; + $this->defined_servers = []; $this->functions = []; - $this->automatic_reconnect = false; - $this->next_reconnect = time() + 1800; + $this->automatic_reconnect = true; $this->username = $username; $this->password = $password; - - try - { - $this->reconnect(); - } - catch(Exception $e) - { - unset($e); - } } /** - * @inheritDoc + * Adds a server to the list of servers to use + * + * @param string $host + * @param int $port + * @return void */ - public function addServer(string $host, int $port): bool + public function addServer(string $host, int $port): void { - if(!isset($this->server_cache[$host])) + if(!isset($this->defined_servers[$host])) { - $this->server_cache[$host] = []; + $this->defined_servers[$host] = []; } - if(in_array($port, $this->server_cache[$host])) + if(in_array($port, $this->defined_servers[$host])) { - return true; + return; } - $this->server_cache[$host][] = $port; - $this->reconnect(); - - return true; + $this->defined_servers[$host][] = $port; } /** - * @inheritDoc + * Adds an array of servers to the list of servers to use + * + * @param array $servers (eg; [host:port, host:port, ...]) + * @return void */ public function addServers(array $servers): void { foreach($servers as $server) { $server = explode(':', $server); - $this->addServer($server[0], $server[1]); + $this->addServer($server[0], (int)$server[1]); } } /** - * @inheritDoc + * Establishes a connection to the server (or servers) + * + * @return void + * @noinspection DuplicatedCode + * @throws ConnectionException */ - public function setOptions(array $options): bool + public function connect(): void { - $this->options = $options; - return true; + if($this->isConnected()) + return; + + if(count($this->defined_servers) === 0) + return; + + foreach($this->defined_servers as $host => $ports) + { + foreach($ports as $port) + { + $connection = new Connection($host, $port, $this->username, $this->password); + $connection->connect(); + + $this->connections[] = $connection; + } + } } /** - * @inheritDoc + * Disconnects from the server + * + * @return void + */ + public function disconnect(): void + { + if(!$this->isConnected()) + return; + + foreach($this->connections as $connection) + { + $connection->disconnect(); + } + + $this->connections = []; + } + + /** + * Reconnects to the server (or servers) + * + * @return void + * @throws ConnectionException + */ + public function reconnect(): void + { + $this->disconnect(); + $this->connect(); + } + + /** + * Returns True if one or more connections are connected, False otherwise + * (Note, some connections may be disconnected, and this will still return True) + * + * @return bool + */ + public function isConnected(): bool + { + if(count($this->connections) === 0) + return false; + + foreach($this->connections as $connection) + { + if($connection->isConnected()) + return true; + } + + return false; + } + + /** + * Sets the options to use for this client + * + * @param array $options + * @return void + */ + public function setOptions(array $options): void + { + $this->options = $options; + } + + /** + * Returns the current options for this client + * + * @return array + */ + public function getOptions(): array + { + return $this->options; + } + + /** + * Clears the current options for this client + * + * @return void + */ + public function clearOptions(): void + { + $this->options = []; + } + + /** + * Returns True if automatic reconnection is enabled, False otherwise + * + * @return bool */ public function automaticReconnectionEnabled(): bool { @@ -134,7 +231,10 @@ } /** - * @inheritDoc + * Enables or disables automatic reconnection + * + * @param bool $enable + * @return void */ public function enableAutomaticReconnection(bool $enable): void { @@ -142,7 +242,12 @@ } /** - * @inheritDoc + * Registers a new function to the worker to handle + * + * @param string $name + * @param callable $callable + * @param mixed|null $context + * @return void */ public function addFunction(string $name, callable $callable, mixed $context = null): void { @@ -153,7 +258,10 @@ } /** - * @inheritDoc + * Removes an existing function from the worker + * + * @param string $function_name + * @return void */ public function removeFunction(string $function_name): void { @@ -161,61 +269,90 @@ } /** - * @inheritDoc + * Processes a job if there's one available + * + * @param bool $blocking + * @param int $timeout + * @param bool $throw_errors + * @return void */ public function work(bool $blocking = true, int $timeout = 500, bool $throw_errors = false): void { - $callback = function($message) use ($throw_errors) - { - var_dump($message->body); - $job = Job::fromArray(msgpack_unpack($message->body)); + if(!$this->isConnected()) + return; - $job_results = new JobResults($job, JobStatus::Success, 'Hello from worker!'); + // Select a random connection + $connection = $this->connections[array_rand($this->connections)]; + + $callback = function($message) use ($throw_errors, $connection) + { + $received_job = Job::fromArray(msgpack_unpack($message->body)); + + if($received_job->isClosure()) + { + Log::debug('net.nosial.tamerlib', 'received closure: ' . $received_job->getId()); + + try + { + // TODO: Check back on this, looks weird. + $closure = $received_job->getData(); + $result = $closure($received_job); + } + catch(Exception $e) + { + unset($e); + + // Do not requeue the job, it's a closure + $connection->getChannel()->basic_nack($message->delivery_info['delivery_tag']); + return; + } + + $job_results = new JobResults($received_job, JobStatus::Success, $result); + $connection->getChannel->basic_publish( + new AMQPMessage(msgpack_pack($job_results->toArray()), ['correlation_id' => $received_job->getId()]) + ); + $connection->getChannel()->basic_ack($message->delivery_info['delivery_tag']); + return; + } + + if(!isset($this->functions[$received_job->getName()])) + { + Log::debug('net.nosial.tamerlib', 'received unknown function: ' . $received_job->getId()); + $connection->getChannel()->basic_nack($message->delivery_info['delivery_tag'], false, true); + return; + } + + Log::debug('net.nosial.tamerlib', 'received function: ' . $received_job->getId()); + $function = $this->functions[$received_job->getName()]; + $callback = $function['function']; try { - // Return $job_results - $this->channel->basic_publish( - new AMQPMessage( - msgpack_pack($job_results->toArray()), - [ - 'correlation_id' => $job->getId() - ] - ) - ); - - $this->channel->basic_ack($message->delivery_info['delivery_tag']); + $result = $callback($received_job->getData(), $function['context']); } - catch (Exception $e) + catch(Exception $e) { - if ($throw_errors) - { - throw $e; - } + unset($e); - $job_results = new JobResults($job, JobStatus::Exception, $e->getMessage()); - - // Return $job_results - $this->channel->basic_publish( - new AMQPMessage( - msgpack_pack($job_results->toArray()), - [ - 'correlation_id' => $job->getId() - ] - ) - ); - - $this->channel->basic_ack($message->delivery_info['delivery_tag']); + // Do not requeue the job, it's a closure + $connection->getChannel()->basic_nack($message->delivery_info['delivery_tag']); + return; } + + $job_results = new JobResults($received_job, JobStatus::Success, $result); + $connection->getChannel->basic_publish( + new AMQPMessage(msgpack_pack($job_results->toArray()), ['correlation_id' => $received_job->getId()]) + ); + $connection->getChannel()->basic_ack($message->delivery_info['delivery_tag']); }; - $this->channel->basic_consume('tamer_queue', '', false, false, false, false, $callback); + $connection->getChannel()->basic_consume('tamer_queue', '', false, false, false, false, $callback); if ($blocking) { while(true) { - $this->channel->wait(); + $connection->getChannel()->wait(); } } else @@ -227,98 +364,25 @@ { break; } - - $this->channel->wait(); + $connection->getChannel()->wait(); } } } - private function reconnect() - { - $connections = []; - - if(count($this->server_cache) === 0) - return; - - foreach($this->server_cache as $host => $ports) - { - foreach($ports as $port) - { - $host = [ - 'host' => $host, - 'port' => $port - ]; - - if($this->username !== null) - $host['username'] = $this->username; - - if($this->password !== null) - $host['password'] = $this->password; - - $connections[] = $host; - } - } - - // Can only connect to one server for now, so we'll just use the first one - $selected_connection = $connections[0]; - $this->disconnect(); - $this->connection = new AMQPStreamConnection( - $selected_connection['host'], - $selected_connection['port'], - $selected_connection['username'] ?? null, - $selected_connection['password'] ?? null - ); - - $this->channel = $this->connection->channel(); - $this->channel->queue_declare('tamer_queue', false, true, false, false); - } - - /** - * Disconnects from the server - * - * @return void - */ - private function disconnect() - { - try - { - if(!is_null($this->channel)) - { - $this->channel->close(); - } - } - catch(Exception $e) - { - unset($e); - } - finally - { - $this->channel = null; - } - - try - { - if(!is_null($this->connection)) - { - $this->connection->close(); - } - } - catch(Exception $e) - { - unset($e); - } - finally - { - $this->connection = null; - } - } - /** * Disconnects from the server when the object is destroyed */ public function __destruct() { - $this->disconnect(); + try + { + $this->disconnect(); + } + catch(Exception $e) + { + unset($e); + // Ignore + } } } \ No newline at end of file