Refactored RabbitMq Client
This commit is contained in:
parent
69c2d0628d
commit
4e474b3449
2 changed files with 426 additions and 167 deletions
|
@ -6,11 +6,9 @@
|
|||
|
||||
use Closure;
|
||||
use Exception;
|
||||
use PhpAmqpLib\Channel\AMQPChannel;
|
||||
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
||||
use PhpAmqpLib\Message\AMQPMessage;
|
||||
use TamerLib\Abstracts\TaskPriority;
|
||||
use TamerLib\Exceptions\ServerException;
|
||||
use TamerLib\Classes\Functions;
|
||||
use TamerLib\Exceptions\ConnectionException;
|
||||
use TamerLib\Interfaces\ClientProtocolInterface;
|
||||
use TamerLib\Objects\Job;
|
||||
use TamerLib\Objects\JobResults;
|
||||
|
@ -19,9 +17,11 @@
|
|||
class Client implements ClientProtocolInterface
|
||||
{
|
||||
/**
|
||||
* An array of servers to use
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $server_cache;
|
||||
private $defined_servers;
|
||||
|
||||
/**
|
||||
* Used for tracking the current execution of tasks and run callbacks on completion
|
||||
|
@ -31,41 +31,43 @@
|
|||
private $tasks;
|
||||
|
||||
/**
|
||||
* Whether to automatically reconnect to the server if the connection is lost
|
||||
*
|
||||
* @var bool
|
||||
*/
|
||||
private $automatic_reconnect;
|
||||
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
private $next_reconnect;
|
||||
|
||||
/**
|
||||
* (Optional) The username to use when connecting to the server
|
||||
*
|
||||
* @var string|null
|
||||
*/
|
||||
private $username;
|
||||
|
||||
/**
|
||||
* (Optional) The password to use when connecting to the server
|
||||
*
|
||||
* @var string|null
|
||||
*/
|
||||
private $password;
|
||||
|
||||
/**
|
||||
* (Optional) An array of options to use when connecting to the server
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $options;
|
||||
|
||||
/**
|
||||
* @var AMQPStreamConnection|null
|
||||
* An array of connections to use
|
||||
*
|
||||
* @var Connection[]
|
||||
*/
|
||||
private $connection;
|
||||
private $connections;
|
||||
|
||||
/**
|
||||
* @var AMQPChannel|null
|
||||
*/
|
||||
private $channel;
|
||||
|
||||
/***
|
||||
* Public Constructor
|
||||
*
|
||||
* @param string|null $username
|
||||
* @param string|null $password
|
||||
*/
|
||||
|
@ -73,95 +75,201 @@
|
|||
{
|
||||
$this->tasks = [];
|
||||
$this->automatic_reconnect = false;
|
||||
$this->next_reconnect = time() + 1800;
|
||||
$this->server_cache = [];
|
||||
$this->defined_servers = [];
|
||||
$this->options = [];
|
||||
$this->connection = null;
|
||||
$this->username = $username;
|
||||
$this->password = $password;
|
||||
|
||||
try
|
||||
{
|
||||
$this->reconnect();
|
||||
}
|
||||
catch(ServerException $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
$this->connections = [];
|
||||
}
|
||||
|
||||
public function setOptions(array $options): bool
|
||||
/**
|
||||
* Adds a server to the list of servers to use
|
||||
*
|
||||
* @param string $host
|
||||
* @param int $port
|
||||
* @return void
|
||||
*/
|
||||
public function addServer(string $host, int $port): void
|
||||
{
|
||||
$this->options = $options;
|
||||
return true;
|
||||
}
|
||||
|
||||
public function addServer(string $host, int $port): bool
|
||||
{
|
||||
if(!isset($this->server_cache[$host]))
|
||||
if(!isset($this->defined_servers[$host]))
|
||||
{
|
||||
$this->server_cache[$host] = [];
|
||||
$this->defined_servers[$host] = [];
|
||||
}
|
||||
|
||||
if(in_array($port, $this->server_cache[$host]))
|
||||
if(in_array($port, $this->defined_servers[$host]))
|
||||
{
|
||||
return true;
|
||||
return;
|
||||
}
|
||||
|
||||
$this->server_cache[$host][] = $port;
|
||||
$this->reconnect();
|
||||
|
||||
return true;
|
||||
$this->defined_servers[$host][] = $port;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a list of servers to the list of servers to use
|
||||
*
|
||||
* @param array $servers (host:port, host:port, ...)
|
||||
* @return bool
|
||||
* @return void
|
||||
*/
|
||||
public function addServers(array $servers): bool
|
||||
public function addServers(array $servers): void
|
||||
{
|
||||
foreach($servers as $server)
|
||||
{
|
||||
$server = explode(':', $server);
|
||||
$this->addServer($server[0], $server[1]);
|
||||
$this->addServer($server[0], (int)$server[1]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects to the server(s) defined
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function connect(): void
|
||||
{
|
||||
if($this->isConnected())
|
||||
return;
|
||||
|
||||
if(count($this->defined_servers) === 0)
|
||||
return;
|
||||
|
||||
foreach($this->defined_servers as $host => $ports)
|
||||
{
|
||||
foreach($ports as $port)
|
||||
{
|
||||
$connection = new Connection($host, $port, $this->username, $this->password);
|
||||
$connection->connect();
|
||||
|
||||
$this->connections[] = $connection;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnects from the server
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function disconnect(): void
|
||||
{
|
||||
if(!$this->isConnected())
|
||||
return;
|
||||
|
||||
foreach($this->connections as $connection)
|
||||
{
|
||||
$connection->disconnect();
|
||||
}
|
||||
|
||||
return true;
|
||||
$this->connections = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the priority for a task based on the priority level
|
||||
* Reconnects to the server
|
||||
*
|
||||
* @param int $priority
|
||||
* @return int
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
private static function calculatePriority(int $priority): int
|
||||
public function reconnect(): void
|
||||
{
|
||||
if($priority < TaskPriority::Low)
|
||||
return 0;
|
||||
|
||||
if($priority > TaskPriority::High)
|
||||
return 255;
|
||||
|
||||
return (int) round(($priority / TaskPriority::High) * 255);
|
||||
$this->disconnect();
|
||||
$this->connect();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns True if one or more connections are connected, False otherwise
|
||||
* (Note, some connections may be disconnected, and this will still return True)
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isConnected(): bool
|
||||
{
|
||||
if(count($this->connections) === 0)
|
||||
return false;
|
||||
|
||||
foreach($this->connections as $connection)
|
||||
{
|
||||
if($connection->isConnected())
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the options array
|
||||
*
|
||||
* @param array $options
|
||||
* @return void
|
||||
*/
|
||||
public function setOptions(array $options): void
|
||||
{
|
||||
$this->options = $options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the options array
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function getOptions(): array
|
||||
{
|
||||
return $this->options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the options array
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function clearOptions(): void
|
||||
{
|
||||
$this->options = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns True if the client is automatically reconnecting to the server
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function automaticReconnectionEnabled(): bool
|
||||
{
|
||||
return $this->automatic_reconnect;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enables or disables automatic reconnecting to the server
|
||||
*
|
||||
* @param bool $enable
|
||||
* @return void
|
||||
*/
|
||||
public function enableAutomaticReconnection(bool $enable): void
|
||||
{
|
||||
$this->automatic_reconnect = $enable;
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs a task in the background (Fire and Forget)
|
||||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
*/
|
||||
public function do(Task $task): void
|
||||
{
|
||||
$job = new Job($task);
|
||||
if(!$this->isConnected())
|
||||
return;
|
||||
|
||||
$job = new Job($task);
|
||||
$message = new AMQPMessage(msgpack_pack($job->toArray()), [
|
||||
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
|
||||
'priority' => self::calculatePriority($task->getPriority()),
|
||||
'correlation_id' => $task->getId(),
|
||||
'priority' => Functions::calculatePriority($task->getPriority()),
|
||||
]);
|
||||
|
||||
$this->channel->basic_publish($message, '', 'tamer_queue');
|
||||
// Select random connection
|
||||
$connection = $this->connections[array_rand($this->connections)];
|
||||
if($this->automatic_reconnect)
|
||||
$connection->preformAutoreconf();
|
||||
$connection->getChannel()->basic_publish($message, '', 'tamer_queue');
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -192,10 +300,10 @@
|
|||
* Adds a closure task to the list of tasks to run
|
||||
*
|
||||
* @param Closure $closure
|
||||
* @param $callback
|
||||
* @param Closure|null $callback
|
||||
* @return void
|
||||
*/
|
||||
public function queueClosure(Closure $closure, $callback): void
|
||||
public function queueClosure(Closure $closure, ?Closure $callback=null): void
|
||||
{
|
||||
$closure_task = new Task('tamer_closure', $closure, $callback);
|
||||
$closure_task->setClosure(true);
|
||||
|
@ -212,27 +320,33 @@
|
|||
if(count($this->tasks) === 0)
|
||||
return false;
|
||||
|
||||
if(!$this->isConnected())
|
||||
return false;
|
||||
|
||||
$this->preformAutoreconf();
|
||||
$correlationIds = [];
|
||||
$connection = $this->connections[array_rand($this->connections)];
|
||||
if($this->automatic_reconnect)
|
||||
$connection->preformAutoreconf();
|
||||
|
||||
/** @var Task $task */
|
||||
foreach($this->tasks as $task)
|
||||
{
|
||||
$correlationIds[] = $task->getId();
|
||||
|
||||
$job = new Job($task);
|
||||
|
||||
$message = new AMQPMessage(msgpack_pack($job->toArray()), [
|
||||
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
|
||||
'correlation_id' => $task->getId(),
|
||||
'reply_to' => 'tamer_queue',
|
||||
'priority' => self::calculatePriority($task->getPriority()),
|
||||
'priority' => Functions::calculatePriority($task->getPriority()),
|
||||
]);
|
||||
|
||||
$this->channel->basic_publish($message, '', 'tamer_queue');
|
||||
$connection->getChannel()->basic_publish($message, '', 'tamer_queue');
|
||||
}
|
||||
|
||||
// Register callback for each task
|
||||
$callback = function($msg) use (&$correlationIds)
|
||||
$callback = function($msg) use (&$correlationIds, $connection)
|
||||
{
|
||||
$job_result = JobResults::fromArray(msgpack_unpack($msg->body));
|
||||
$task = $this->getTaskById($job_result->getId());
|
||||
|
@ -248,31 +362,38 @@
|
|||
|
||||
// Remove the processed correlation_id
|
||||
$index = array_search($msg->get('correlation_id'), $correlationIds);
|
||||
if ($index !== false) {
|
||||
unset($correlationIds[$index]);
|
||||
}
|
||||
|
||||
$this->channel->basic_ack($msg->delivery_info['delivery_tag']);
|
||||
if ($index !== false)
|
||||
{
|
||||
unset($correlationIds[$index]);
|
||||
$connection->getChannel()->basic_ack($msg->delivery_info['delivery_tag']);
|
||||
}
|
||||
else
|
||||
{
|
||||
$connection->getChannel()->basic_nack($msg->delivery_info['delivery_tag'], false, true);
|
||||
}
|
||||
|
||||
// Stop consuming when all tasks are processed
|
||||
if(count($correlationIds) === 0)
|
||||
{
|
||||
$this->channel->basic_cancel($msg->delivery_info['consumer_tag']);
|
||||
$connection->getChannel()->basic_cancel($msg->delivery_info['consumer_tag']);
|
||||
}
|
||||
};
|
||||
|
||||
$this->channel->basic_consume('tamer_queue', '', false, false, false, false, $callback);
|
||||
$connection->getChannel()->basic_consume('tamer_queue', '', false, false, false, false, $callback);
|
||||
|
||||
// Start consuming messages
|
||||
while(count($this->channel->callbacks))
|
||||
while(count($connection->getChannel()->callbacks))
|
||||
{
|
||||
$this->channel->wait();
|
||||
$connection->getChannel()->wait();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a task by its id
|
||||
*
|
||||
* @param string $id
|
||||
* @return Task|null
|
||||
*/
|
||||
|
@ -290,104 +411,19 @@
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns True if the client is automatically reconnecting to the server
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function automaticReconnectionEnabled(): bool
|
||||
{
|
||||
return $this->automatic_reconnect;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enables or disables automatic reconnecting to the server
|
||||
*
|
||||
* @param bool $enable
|
||||
* @return void
|
||||
*/
|
||||
public function enableAutomaticReconnection(bool $enable): void
|
||||
{
|
||||
$this->automatic_reconnect = $enable;
|
||||
}
|
||||
|
||||
private function reconnect()
|
||||
{
|
||||
$connections = [];
|
||||
|
||||
if(count($this->server_cache) === 0)
|
||||
return;
|
||||
|
||||
foreach($this->server_cache as $host => $ports)
|
||||
{
|
||||
foreach($ports as $port)
|
||||
{
|
||||
$host = [
|
||||
'host' => $host,
|
||||
'port' => $port
|
||||
];
|
||||
|
||||
if($this->username !== null)
|
||||
$host['username'] = $this->username;
|
||||
|
||||
if($this->password !== null)
|
||||
$host['password'] = $this->password;
|
||||
|
||||
$connections[] = $host;
|
||||
}
|
||||
}
|
||||
|
||||
// Can only connect to one server for now, so we'll just use the first one
|
||||
$selected_connection = $connections[0];
|
||||
$this->disconnect();
|
||||
$this->connection = new AMQPStreamConnection(
|
||||
$selected_connection['host'],
|
||||
$selected_connection['port'],
|
||||
$selected_connection['username'] ?? null,
|
||||
$selected_connection['password'] ?? null
|
||||
);
|
||||
|
||||
$this->channel = $this->connection->channel();
|
||||
$this->channel->queue_declare('tamer_queue', false, true, false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnects from the server
|
||||
* The automatic reconnect process
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function disconnect()
|
||||
private function preformAutoreconf(): void
|
||||
{
|
||||
try
|
||||
if($this->automatic_reconnect)
|
||||
{
|
||||
if(!is_null($this->channel))
|
||||
foreach($this->connections as $connection)
|
||||
{
|
||||
$this->channel->close();
|
||||
$connection->preformAutoreconf();
|
||||
}
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
$this->channel = null;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if(!is_null($this->connection))
|
||||
{
|
||||
$this->connection->close();
|
||||
}
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
$this->connection = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -395,7 +431,14 @@
|
|||
*/
|
||||
public function __destruct()
|
||||
{
|
||||
$this->disconnect();
|
||||
try
|
||||
{
|
||||
$this->disconnect();
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
216
src/TamerLib/Protocols/RabbitMq/Connection.php
Normal file
216
src/TamerLib/Protocols/RabbitMq/Connection.php
Normal file
|
@ -0,0 +1,216 @@
|
|||
<?php
|
||||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace TamerLib\Protocols\RabbitMq;
|
||||
|
||||
use Exception;
|
||||
use LogLib\Log;
|
||||
use PhpAmqpLib\Channel\AMQPChannel;
|
||||
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
||||
use TamerLib\Exceptions\ConnectionException;
|
||||
|
||||
class Connection
|
||||
{
|
||||
/**
|
||||
* The unique ID of the connection
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
private $id;
|
||||
|
||||
/**
|
||||
* The stream connection
|
||||
*
|
||||
* @var AMQPStreamConnection|null
|
||||
*/
|
||||
private $connection;
|
||||
|
||||
/**
|
||||
* The channel to use for the connection
|
||||
*
|
||||
* @var AMQPChannel|null
|
||||
*/
|
||||
private $channel;
|
||||
|
||||
/**
|
||||
* The host to connect to
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
private $host;
|
||||
|
||||
/**
|
||||
* The port to connect to
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $port;
|
||||
|
||||
/**
|
||||
* (Optional) The username to use when connecting to the server
|
||||
*
|
||||
* @var string|null
|
||||
*/
|
||||
private $username;
|
||||
|
||||
/**
|
||||
* (Optional) The password to use when connecting to the server
|
||||
*
|
||||
* @var string|null
|
||||
*/
|
||||
private $password;
|
||||
|
||||
/**
|
||||
* The Unix timestamp of when the next reconnect should occur
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $next_reconnect;
|
||||
|
||||
/**
|
||||
* @param string $host
|
||||
* @param int $port
|
||||
* @param string|null $username
|
||||
* @param string|null $password
|
||||
*/
|
||||
public function __construct(string $host, int $port, ?string $username=null, ?string $password=null)
|
||||
{
|
||||
$this->id = uniqid();
|
||||
$this->host = $host;
|
||||
$this->port = $port;
|
||||
$this->username = $username;
|
||||
$this->password = $password;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string
|
||||
*/
|
||||
public function getId(): string
|
||||
{
|
||||
return $this->id;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return AMQPStreamConnection|null
|
||||
*/
|
||||
public function getConnection(): ?AMQPStreamConnection
|
||||
{
|
||||
return $this->connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return AMQPChannel|null
|
||||
*/
|
||||
public function getChannel(): ?AMQPChannel
|
||||
{
|
||||
return $this->channel;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns True if the client is connected to the server
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isConnected(): bool
|
||||
{
|
||||
return $this->connection !== null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Establishes a connection to the server
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function connect(): void
|
||||
{
|
||||
if($this->isConnected())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
$this->connection = new AMQPStreamConnection($this->host, $this->port, $this->username, $this->password);
|
||||
$this->channel = $this->connection->channel();
|
||||
$this->channel->queue_declare('tamer_queue', false, true, false, false);
|
||||
$this->next_reconnect = time() + 1800;
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
throw new ConnectionException(sprintf('Could not connect to RabbitMQ server: %s', $e->getMessage()), $e->getCode(), $e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the connection to the server
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function disconnect(): void
|
||||
{
|
||||
if(!$this->isConnected())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
$this->channel?->close();
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
$this->connection?->close();
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
|
||||
$this->channel = null;
|
||||
$this->connection = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnects to the server
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function reconnect(): void
|
||||
{
|
||||
$this->disconnect();
|
||||
$this->connect();
|
||||
}
|
||||
|
||||
/**
|
||||
* The automatic reconnect process
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function preformAutoreconf(): void
|
||||
{
|
||||
if($this->next_reconnect < time())
|
||||
{
|
||||
try
|
||||
{
|
||||
$this->reconnect();
|
||||
}
|
||||
catch (Exception $e)
|
||||
{
|
||||
Log::error('net.nosial.tamerlib', 'Could not reconnect to RabbitMQ server: %s', $e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
$this->next_reconnect = time() + 1800;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Add table
Reference in a new issue