Refactored Worker for RabbitMQ (Needs more work)

This commit is contained in:
Netkas 2023-02-09 15:31:44 -05:00
parent d5d515d63b
commit 9084ad8ca5

View file

@ -5,10 +5,10 @@
namespace TamerLib\Protocols\RabbitMq;
use Exception;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use LogLib\Log;
use PhpAmqpLib\Message\AMQPMessage;
use TamerLib\Abstracts\JobStatus;
use TamerLib\Exceptions\ConnectionException;
use TamerLib\Interfaces\WorkerProtocolInterface;
use TamerLib\Objects\Job;
use TamerLib\Objects\JobResults;
@ -16,9 +16,11 @@
class Worker implements WorkerProtocolInterface
{
/**
* An array of defined servers to use
*
* @var array
*/
private $server_cache;
private $defined_servers;
/**
* @var false
@ -26,107 +28,202 @@
private $automatic_reconnect;
/**
* @var int
*/
private $next_reconnect;
/**
* An array of functions that the worker handles
*
* @var array
*/
private $functions;
/**
* (Optional) The username to use when connecting to the server (if required)
*
* @var string|null
*/
private $username;
/**
* (Optional) The password to use when connecting to the server
*
* @var string|null
*/
private $password;
/**
* @var AMQPStreamConnection|null
* A array of active connections
*
* @var Connection[]
*/
private $connection;
/**
* @var AMQPChannel|null
*/
private $channel;
private $connections;
/**
* @var array
*/
private $options;
/**
* @inheritDoc
* Public Constructor with optional username and password
*
* @param string|null $username
* @param string|null $password
*/
public function __construct(?string $username = null, ?string $password = null)
{
$this->server_cache = [];
$this->defined_servers = [];
$this->functions = [];
$this->automatic_reconnect = false;
$this->next_reconnect = time() + 1800;
$this->automatic_reconnect = true;
$this->username = $username;
$this->password = $password;
try
{
$this->reconnect();
}
catch(Exception $e)
{
unset($e);
}
}
/**
* @inheritDoc
* Adds a server to the list of servers to use
*
* @param string $host
* @param int $port
* @return void
*/
public function addServer(string $host, int $port): bool
public function addServer(string $host, int $port): void
{
if(!isset($this->server_cache[$host]))
if(!isset($this->defined_servers[$host]))
{
$this->server_cache[$host] = [];
$this->defined_servers[$host] = [];
}
if(in_array($port, $this->server_cache[$host]))
if(in_array($port, $this->defined_servers[$host]))
{
return true;
return;
}
$this->server_cache[$host][] = $port;
$this->reconnect();
return true;
$this->defined_servers[$host][] = $port;
}
/**
* @inheritDoc
* Adds an array of servers to the list of servers to use
*
* @param array $servers (eg; [host:port, host:port, ...])
* @return void
*/
public function addServers(array $servers): void
{
foreach($servers as $server)
{
$server = explode(':', $server);
$this->addServer($server[0], $server[1]);
$this->addServer($server[0], (int)$server[1]);
}
}
/**
* @inheritDoc
* Establishes a connection to the server (or servers)
*
* @return void
* @noinspection DuplicatedCode
* @throws ConnectionException
*/
public function setOptions(array $options): bool
public function connect(): void
{
$this->options = $options;
return true;
if($this->isConnected())
return;
if(count($this->defined_servers) === 0)
return;
foreach($this->defined_servers as $host => $ports)
{
foreach($ports as $port)
{
$connection = new Connection($host, $port, $this->username, $this->password);
$connection->connect();
$this->connections[] = $connection;
}
}
}
/**
* @inheritDoc
* Disconnects from the server
*
* @return void
*/
public function disconnect(): void
{
if(!$this->isConnected())
return;
foreach($this->connections as $connection)
{
$connection->disconnect();
}
$this->connections = [];
}
/**
* Reconnects to the server (or servers)
*
* @return void
* @throws ConnectionException
*/
public function reconnect(): void
{
$this->disconnect();
$this->connect();
}
/**
* Returns True if one or more connections are connected, False otherwise
* (Note, some connections may be disconnected, and this will still return True)
*
* @return bool
*/
public function isConnected(): bool
{
if(count($this->connections) === 0)
return false;
foreach($this->connections as $connection)
{
if($connection->isConnected())
return true;
}
return false;
}
/**
* Sets the options to use for this client
*
* @param array $options
* @return void
*/
public function setOptions(array $options): void
{
$this->options = $options;
}
/**
* Returns the current options for this client
*
* @return array
*/
public function getOptions(): array
{
return $this->options;
}
/**
* Clears the current options for this client
*
* @return void
*/
public function clearOptions(): void
{
$this->options = [];
}
/**
* Returns True if automatic reconnection is enabled, False otherwise
*
* @return bool
*/
public function automaticReconnectionEnabled(): bool
{
@ -134,7 +231,10 @@
}
/**
* @inheritDoc
* Enables or disables automatic reconnection
*
* @param bool $enable
* @return void
*/
public function enableAutomaticReconnection(bool $enable): void
{
@ -142,7 +242,12 @@
}
/**
* @inheritDoc
* Registers a new function to the worker to handle
*
* @param string $name
* @param callable $callable
* @param mixed|null $context
* @return void
*/
public function addFunction(string $name, callable $callable, mixed $context = null): void
{
@ -153,7 +258,10 @@
}
/**
* @inheritDoc
* Removes an existing function from the worker
*
* @param string $function_name
* @return void
*/
public function removeFunction(string $function_name): void
{
@ -161,61 +269,90 @@
}
/**
* @inheritDoc
* Processes a job if there's one available
*
* @param bool $blocking
* @param int $timeout
* @param bool $throw_errors
* @return void
*/
public function work(bool $blocking = true, int $timeout = 500, bool $throw_errors = false): void
{
$callback = function($message) use ($throw_errors)
{
var_dump($message->body);
$job = Job::fromArray(msgpack_unpack($message->body));
if(!$this->isConnected())
return;
$job_results = new JobResults($job, JobStatus::Success, 'Hello from worker!');
// Select a random connection
$connection = $this->connections[array_rand($this->connections)];
$callback = function($message) use ($throw_errors, $connection)
{
$received_job = Job::fromArray(msgpack_unpack($message->body));
if($received_job->isClosure())
{
Log::debug('net.nosial.tamerlib', 'received closure: ' . $received_job->getId());
try
{
// TODO: Check back on this, looks weird.
$closure = $received_job->getData();
$result = $closure($received_job);
}
catch(Exception $e)
{
unset($e);
// Do not requeue the job, it's a closure
$connection->getChannel()->basic_nack($message->delivery_info['delivery_tag']);
return;
}
$job_results = new JobResults($received_job, JobStatus::Success, $result);
$connection->getChannel->basic_publish(
new AMQPMessage(msgpack_pack($job_results->toArray()), ['correlation_id' => $received_job->getId()])
);
$connection->getChannel()->basic_ack($message->delivery_info['delivery_tag']);
return;
}
if(!isset($this->functions[$received_job->getName()]))
{
Log::debug('net.nosial.tamerlib', 'received unknown function: ' . $received_job->getId());
$connection->getChannel()->basic_nack($message->delivery_info['delivery_tag'], false, true);
return;
}
Log::debug('net.nosial.tamerlib', 'received function: ' . $received_job->getId());
$function = $this->functions[$received_job->getName()];
$callback = $function['function'];
try
{
// Return $job_results
$this->channel->basic_publish(
new AMQPMessage(
msgpack_pack($job_results->toArray()),
[
'correlation_id' => $job->getId()
]
)
);
$this->channel->basic_ack($message->delivery_info['delivery_tag']);
$result = $callback($received_job->getData(), $function['context']);
}
catch (Exception $e)
catch(Exception $e)
{
if ($throw_errors)
{
throw $e;
}
unset($e);
$job_results = new JobResults($job, JobStatus::Exception, $e->getMessage());
// Return $job_results
$this->channel->basic_publish(
new AMQPMessage(
msgpack_pack($job_results->toArray()),
[
'correlation_id' => $job->getId()
]
)
);
$this->channel->basic_ack($message->delivery_info['delivery_tag']);
// Do not requeue the job, it's a closure
$connection->getChannel()->basic_nack($message->delivery_info['delivery_tag']);
return;
}
$job_results = new JobResults($received_job, JobStatus::Success, $result);
$connection->getChannel->basic_publish(
new AMQPMessage(msgpack_pack($job_results->toArray()), ['correlation_id' => $received_job->getId()])
);
$connection->getChannel()->basic_ack($message->delivery_info['delivery_tag']);
};
$this->channel->basic_consume('tamer_queue', '', false, false, false, false, $callback);
$connection->getChannel()->basic_consume('tamer_queue', '', false, false, false, false, $callback);
if ($blocking)
{
while(true)
{
$this->channel->wait();
$connection->getChannel()->wait();
}
}
else
@ -227,98 +364,25 @@
{
break;
}
$this->channel->wait();
$connection->getChannel()->wait();
}
}
}
private function reconnect()
{
$connections = [];
if(count($this->server_cache) === 0)
return;
foreach($this->server_cache as $host => $ports)
{
foreach($ports as $port)
{
$host = [
'host' => $host,
'port' => $port
];
if($this->username !== null)
$host['username'] = $this->username;
if($this->password !== null)
$host['password'] = $this->password;
$connections[] = $host;
}
}
// Can only connect to one server for now, so we'll just use the first one
$selected_connection = $connections[0];
$this->disconnect();
$this->connection = new AMQPStreamConnection(
$selected_connection['host'],
$selected_connection['port'],
$selected_connection['username'] ?? null,
$selected_connection['password'] ?? null
);
$this->channel = $this->connection->channel();
$this->channel->queue_declare('tamer_queue', false, true, false, false);
}
/**
* Disconnects from the server
*
* @return void
*/
private function disconnect()
{
try
{
if(!is_null($this->channel))
{
$this->channel->close();
}
}
catch(Exception $e)
{
unset($e);
}
finally
{
$this->channel = null;
}
try
{
if(!is_null($this->connection))
{
$this->connection->close();
}
}
catch(Exception $e)
{
unset($e);
}
finally
{
$this->connection = null;
}
}
/**
* Disconnects from the server when the object is destroyed
*/
public function __destruct()
{
$this->disconnect();
try
{
$this->disconnect();
}
catch(Exception $e)
{
unset($e);
// Ignore
}
}
}