From 4e474b34493d593d37c64e14cb37ecb91f31814f Mon Sep 17 00:00:00 2001 From: Netkas Date: Sun, 5 Feb 2023 21:00:05 -0500 Subject: [PATCH] Refactored RabbitMq Client --- src/TamerLib/Protocols/RabbitMq/Client.php | 377 ++++++++++-------- .../Protocols/RabbitMq/Connection.php | 216 ++++++++++ 2 files changed, 426 insertions(+), 167 deletions(-) create mode 100644 src/TamerLib/Protocols/RabbitMq/Connection.php diff --git a/src/TamerLib/Protocols/RabbitMq/Client.php b/src/TamerLib/Protocols/RabbitMq/Client.php index e07fac8..622f350 100644 --- a/src/TamerLib/Protocols/RabbitMq/Client.php +++ b/src/TamerLib/Protocols/RabbitMq/Client.php @@ -6,11 +6,9 @@ use Closure; use Exception; - use PhpAmqpLib\Channel\AMQPChannel; - use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; - use TamerLib\Abstracts\TaskPriority; - use TamerLib\Exceptions\ServerException; + use TamerLib\Classes\Functions; + use TamerLib\Exceptions\ConnectionException; use TamerLib\Interfaces\ClientProtocolInterface; use TamerLib\Objects\Job; use TamerLib\Objects\JobResults; @@ -19,9 +17,11 @@ class Client implements ClientProtocolInterface { /** + * An array of servers to use + * * @var array */ - private $server_cache; + private $defined_servers; /** * Used for tracking the current execution of tasks and run callbacks on completion @@ -31,41 +31,43 @@ private $tasks; /** + * Whether to automatically reconnect to the server if the connection is lost + * * @var bool */ private $automatic_reconnect; /** - * @var int - */ - private $next_reconnect; - - /** + * (Optional) The username to use when connecting to the server + * * @var string|null */ private $username; /** + * (Optional) The password to use when connecting to the server + * * @var string|null */ private $password; /** + * (Optional) An array of options to use when connecting to the server + * * @var array */ private $options; /** - * @var AMQPStreamConnection|null + * An array of connections to use + * + * @var Connection[] */ - private $connection; + private $connections; /** - * @var AMQPChannel|null - */ - private $channel; - - /*** + * Public Constructor + * * @param string|null $username * @param string|null $password */ @@ -73,95 +75,201 @@ { $this->tasks = []; $this->automatic_reconnect = false; - $this->next_reconnect = time() + 1800; - $this->server_cache = []; + $this->defined_servers = []; $this->options = []; - $this->connection = null; $this->username = $username; $this->password = $password; - - try - { - $this->reconnect(); - } - catch(ServerException $e) - { - unset($e); - } + $this->connections = []; } - public function setOptions(array $options): bool + /** + * Adds a server to the list of servers to use + * + * @param string $host + * @param int $port + * @return void + */ + public function addServer(string $host, int $port): void { - $this->options = $options; - return true; - } - - public function addServer(string $host, int $port): bool - { - if(!isset($this->server_cache[$host])) + if(!isset($this->defined_servers[$host])) { - $this->server_cache[$host] = []; + $this->defined_servers[$host] = []; } - if(in_array($port, $this->server_cache[$host])) + if(in_array($port, $this->defined_servers[$host])) { - return true; + return; } - $this->server_cache[$host][] = $port; - $this->reconnect(); - - return true; + $this->defined_servers[$host][] = $port; } /** * Adds a list of servers to the list of servers to use * * @param array $servers (host:port, host:port, ...) - * @return bool + * @return void */ - public function addServers(array $servers): bool + public function addServers(array $servers): void { foreach($servers as $server) { $server = explode(':', $server); - $this->addServer($server[0], $server[1]); + $this->addServer($server[0], (int)$server[1]); + } + } + + /** + * Connects to the server(s) defined + * + * @return void + * @throws ConnectionException + */ + public function connect(): void + { + if($this->isConnected()) + return; + + if(count($this->defined_servers) === 0) + return; + + foreach($this->defined_servers as $host => $ports) + { + foreach($ports as $port) + { + $connection = new Connection($host, $port, $this->username, $this->password); + $connection->connect(); + + $this->connections[] = $connection; + } + } + } + + /** + * Disconnects from the server + * + * @return void + */ + public function disconnect(): void + { + if(!$this->isConnected()) + return; + + foreach($this->connections as $connection) + { + $connection->disconnect(); } - return true; + $this->connections = []; } /** - * Calculates the priority for a task based on the priority level + * Reconnects to the server * - * @param int $priority - * @return int + * @return void + * @throws ConnectionException */ - private static function calculatePriority(int $priority): int + public function reconnect(): void { - if($priority < TaskPriority::Low) - return 0; - - if($priority > TaskPriority::High) - return 255; - - return (int) round(($priority / TaskPriority::High) * 255); + $this->disconnect(); + $this->connect(); } /** + * Returns True if one or more connections are connected, False otherwise + * (Note, some connections may be disconnected, and this will still return True) + * + * @return bool + */ + public function isConnected(): bool + { + if(count($this->connections) === 0) + return false; + + foreach($this->connections as $connection) + { + if($connection->isConnected()) + return true; + } + + return false; + } + + /** + * Sets the options array + * + * @param array $options + * @return void + */ + public function setOptions(array $options): void + { + $this->options = $options; + } + + /** + * Returns the options array + * + * @return array + */ + public function getOptions(): array + { + return $this->options; + } + + /** + * Clears the options array + * + * @return void + */ + public function clearOptions(): void + { + $this->options = []; + } + + /** + * Returns True if the client is automatically reconnecting to the server + * + * @return bool + */ + public function automaticReconnectionEnabled(): bool + { + return $this->automatic_reconnect; + } + + /** + * Enables or disables automatic reconnecting to the server + * + * @param bool $enable + * @return void + */ + public function enableAutomaticReconnection(bool $enable): void + { + $this->automatic_reconnect = $enable; + } + + /** + * Runs a task in the background (Fire and Forget) + * * @param Task $task * @return void */ public function do(Task $task): void { - $job = new Job($task); + if(!$this->isConnected()) + return; + $job = new Job($task); $message = new AMQPMessage(msgpack_pack($job->toArray()), [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, - 'priority' => self::calculatePriority($task->getPriority()), + 'correlation_id' => $task->getId(), + 'priority' => Functions::calculatePriority($task->getPriority()), ]); - $this->channel->basic_publish($message, '', 'tamer_queue'); + // Select random connection + $connection = $this->connections[array_rand($this->connections)]; + if($this->automatic_reconnect) + $connection->preformAutoreconf(); + $connection->getChannel()->basic_publish($message, '', 'tamer_queue'); } /** @@ -192,10 +300,10 @@ * Adds a closure task to the list of tasks to run * * @param Closure $closure - * @param $callback + * @param Closure|null $callback * @return void */ - public function queueClosure(Closure $closure, $callback): void + public function queueClosure(Closure $closure, ?Closure $callback=null): void { $closure_task = new Task('tamer_closure', $closure, $callback); $closure_task->setClosure(true); @@ -212,27 +320,33 @@ if(count($this->tasks) === 0) return false; + if(!$this->isConnected()) + return false; + + $this->preformAutoreconf(); $correlationIds = []; + $connection = $this->connections[array_rand($this->connections)]; + if($this->automatic_reconnect) + $connection->preformAutoreconf(); /** @var Task $task */ foreach($this->tasks as $task) { $correlationIds[] = $task->getId(); - $job = new Job($task); $message = new AMQPMessage(msgpack_pack($job->toArray()), [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'correlation_id' => $task->getId(), 'reply_to' => 'tamer_queue', - 'priority' => self::calculatePriority($task->getPriority()), + 'priority' => Functions::calculatePriority($task->getPriority()), ]); - $this->channel->basic_publish($message, '', 'tamer_queue'); + $connection->getChannel()->basic_publish($message, '', 'tamer_queue'); } // Register callback for each task - $callback = function($msg) use (&$correlationIds) + $callback = function($msg) use (&$correlationIds, $connection) { $job_result = JobResults::fromArray(msgpack_unpack($msg->body)); $task = $this->getTaskById($job_result->getId()); @@ -248,31 +362,38 @@ // Remove the processed correlation_id $index = array_search($msg->get('correlation_id'), $correlationIds); - if ($index !== false) { - unset($correlationIds[$index]); - } - $this->channel->basic_ack($msg->delivery_info['delivery_tag']); + if ($index !== false) + { + unset($correlationIds[$index]); + $connection->getChannel()->basic_ack($msg->delivery_info['delivery_tag']); + } + else + { + $connection->getChannel()->basic_nack($msg->delivery_info['delivery_tag'], false, true); + } // Stop consuming when all tasks are processed if(count($correlationIds) === 0) { - $this->channel->basic_cancel($msg->delivery_info['consumer_tag']); + $connection->getChannel()->basic_cancel($msg->delivery_info['consumer_tag']); } }; - $this->channel->basic_consume('tamer_queue', '', false, false, false, false, $callback); + $connection->getChannel()->basic_consume('tamer_queue', '', false, false, false, false, $callback); // Start consuming messages - while(count($this->channel->callbacks)) + while(count($connection->getChannel()->callbacks)) { - $this->channel->wait(); + $connection->getChannel()->wait(); } return true; } /** + * Returns a task by its id + * * @param string $id * @return Task|null */ @@ -290,104 +411,19 @@ } /** - * Returns True if the client is automatically reconnecting to the server - * - * @return bool - */ - public function automaticReconnectionEnabled(): bool - { - return $this->automatic_reconnect; - } - - /** - * Enables or disables automatic reconnecting to the server - * - * @param bool $enable - * @return void - */ - public function enableAutomaticReconnection(bool $enable): void - { - $this->automatic_reconnect = $enable; - } - - private function reconnect() - { - $connections = []; - - if(count($this->server_cache) === 0) - return; - - foreach($this->server_cache as $host => $ports) - { - foreach($ports as $port) - { - $host = [ - 'host' => $host, - 'port' => $port - ]; - - if($this->username !== null) - $host['username'] = $this->username; - - if($this->password !== null) - $host['password'] = $this->password; - - $connections[] = $host; - } - } - - // Can only connect to one server for now, so we'll just use the first one - $selected_connection = $connections[0]; - $this->disconnect(); - $this->connection = new AMQPStreamConnection( - $selected_connection['host'], - $selected_connection['port'], - $selected_connection['username'] ?? null, - $selected_connection['password'] ?? null - ); - - $this->channel = $this->connection->channel(); - $this->channel->queue_declare('tamer_queue', false, true, false, false); - } - - /** - * Disconnects from the server + * The automatic reconnect process * * @return void */ - private function disconnect() + private function preformAutoreconf(): void { - try + if($this->automatic_reconnect) { - if(!is_null($this->channel)) + foreach($this->connections as $connection) { - $this->channel->close(); + $connection->preformAutoreconf(); } } - catch(Exception $e) - { - unset($e); - } - finally - { - $this->channel = null; - } - - try - { - if(!is_null($this->connection)) - { - $this->connection->close(); - } - } - catch(Exception $e) - { - unset($e); - } - finally - { - $this->connection = null; - } } /** @@ -395,7 +431,14 @@ */ public function __destruct() { - $this->disconnect(); + try + { + $this->disconnect(); + } + catch(Exception $e) + { + unset($e); + } } } \ No newline at end of file diff --git a/src/TamerLib/Protocols/RabbitMq/Connection.php b/src/TamerLib/Protocols/RabbitMq/Connection.php new file mode 100644 index 0000000..0ed425e --- /dev/null +++ b/src/TamerLib/Protocols/RabbitMq/Connection.php @@ -0,0 +1,216 @@ +id = uniqid(); + $this->host = $host; + $this->port = $port; + $this->username = $username; + $this->password = $password; + } + + /** + * @return string + */ + public function getId(): string + { + return $this->id; + } + + /** + * @return AMQPStreamConnection|null + */ + public function getConnection(): ?AMQPStreamConnection + { + return $this->connection; + } + + /** + * @return AMQPChannel|null + */ + public function getChannel(): ?AMQPChannel + { + return $this->channel; + } + + /** + * Returns True if the client is connected to the server + * + * @return bool + */ + public function isConnected(): bool + { + return $this->connection !== null; + } + + /** + * Establishes a connection to the server + * + * @return void + * @throws ConnectionException + */ + public function connect(): void + { + if($this->isConnected()) + { + return; + } + + try + { + $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->username, $this->password); + $this->channel = $this->connection->channel(); + $this->channel->queue_declare('tamer_queue', false, true, false, false); + $this->next_reconnect = time() + 1800; + } + catch(Exception $e) + { + throw new ConnectionException(sprintf('Could not connect to RabbitMQ server: %s', $e->getMessage()), $e->getCode(), $e); + } + } + + /** + * Closes the connection to the server + * + * @return void + */ + public function disconnect(): void + { + if(!$this->isConnected()) + { + return; + } + + try + { + $this->channel?->close(); + } + catch(Exception $e) + { + unset($e); + } + + try + { + $this->connection?->close(); + } + catch(Exception $e) + { + unset($e); + } + + $this->channel = null; + $this->connection = null; + } + + /** + * Reconnects to the server + * + * @return void + * @throws ConnectionException + */ + public function reconnect(): void + { + $this->disconnect(); + $this->connect(); + } + + /** + * The automatic reconnect process + * + * @return void + */ + public function preformAutoreconf(): void + { + if($this->next_reconnect < time()) + { + try + { + $this->reconnect(); + } + catch (Exception $e) + { + Log::error('net.nosial.tamerlib', 'Could not reconnect to RabbitMQ server: %s', $e); + } + finally + { + $this->next_reconnect = time() + 1800; + } + } + } + + } \ No newline at end of file