Implemented RabbitMQ and Refactored Client & Worker (WIP)
This commit is contained in:
parent
d15da30813
commit
6d8d4a75a4
13 changed files with 966 additions and 170 deletions
|
@ -46,6 +46,12 @@
|
|||
"version": "latest",
|
||||
"source_type": "remote",
|
||||
"source": "opis/closure=latest@composer"
|
||||
},
|
||||
{
|
||||
"name": "com.php_amqplib.php_amqplib",
|
||||
"version": "latest",
|
||||
"source_type": "remote",
|
||||
"source": "php-amqplib/php-amqplib=latest@composer"
|
||||
}
|
||||
],
|
||||
"configurations": [
|
||||
|
|
|
@ -2,17 +2,18 @@
|
|||
|
||||
namespace Tamer\Interfaces;
|
||||
|
||||
use Closure;
|
||||
use Tamer\Objects\Task;
|
||||
|
||||
interface ClientProtocolInterface
|
||||
{
|
||||
/**
|
||||
* Adds options to the client (client specific)
|
||||
* Public Constructor with optional username and password
|
||||
*
|
||||
* @param array $options
|
||||
* @return bool
|
||||
* @param string|null $username (optional) The username to use when connecting to the server (if required)
|
||||
* @param string|null $password (optional) The password to use when connecting to the server
|
||||
*/
|
||||
public function addOptions(array $options): bool;
|
||||
public function __construct(?string $username=null, ?string $password=null);
|
||||
|
||||
/**
|
||||
* Adds a server to the list of servers to use
|
||||
|
@ -32,27 +33,12 @@
|
|||
public function addServers(array $servers): bool;
|
||||
|
||||
/**
|
||||
* Processes a task in the background (does not return a result)
|
||||
*
|
||||
* @param Task $task The task to process
|
||||
* @return void
|
||||
*/
|
||||
public function doBackground(Task $task): void;
|
||||
|
||||
/**
|
||||
* Queues a task to be processed in parallel (returns a result handled by a callback)
|
||||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
*/
|
||||
public function addTask(Task $task): void;
|
||||
|
||||
/**
|
||||
* Executes all tasks in the queue and waits for them to complete
|
||||
* Adds options to the client (client specific)
|
||||
*
|
||||
* @param array $options
|
||||
* @return bool
|
||||
*/
|
||||
public function run(): bool;
|
||||
public function addOptions(array $options): bool;
|
||||
|
||||
/**
|
||||
* Returns True if the client is set to automatically reconnect to the server after a period of time
|
||||
|
@ -68,4 +54,44 @@
|
|||
* @return void
|
||||
*/
|
||||
public function setAutomaticReconnect(bool $automatic_reconnect): void;
|
||||
|
||||
/**
|
||||
* Processes a task in the background (does not return a result)
|
||||
*
|
||||
* @param Task $task The task to process
|
||||
* @return void
|
||||
*/
|
||||
public function do(Task $task): void;
|
||||
|
||||
/**
|
||||
* Executes a closure operation in the background (does not return a result)
|
||||
*
|
||||
* @param Closure $closure The closure operation to perform (remote)
|
||||
* @return void
|
||||
*/
|
||||
public function doClosure(Closure $closure): void;
|
||||
|
||||
/**
|
||||
* Queues a task to be processed in parallel (returns a result handled by a callback)
|
||||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
*/
|
||||
public function queue(Task $task): void;
|
||||
|
||||
/**
|
||||
* Queues a closure to be processed in parallel (returns a result handled by a callback)
|
||||
*
|
||||
* @param Closure $closure The closure operation to perform (remote)
|
||||
* @param Closure $callback The closure to call when the operation is complete (local)
|
||||
* @return void
|
||||
*/
|
||||
public function queueClosure(Closure $closure, Closure $callback): void;
|
||||
|
||||
/**
|
||||
* Executes all tasks in the queue and waits for them to complete
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function run(): bool;
|
||||
}
|
|
@ -1,8 +1,82 @@
|
|||
<?php
|
||||
|
||||
namespace Tamer\Interfaces;
|
||||
namespace Tamer\Interfaces;
|
||||
|
||||
interface WorkerProtocolInterface
|
||||
{
|
||||
interface WorkerProtocolInterface
|
||||
{
|
||||
/**
|
||||
* Public Constructor with optional username and password
|
||||
*
|
||||
* @param string|null $username (optional) The username to use when connecting to the server (if required)
|
||||
* @param string|null $password (optional) The password to use when connecting to the server
|
||||
*/
|
||||
public function __construct(?string $username=null, ?string $password=null);
|
||||
|
||||
}
|
||||
/**
|
||||
* Adds a server to the list of servers to use
|
||||
*
|
||||
* @param string $host The host to connect to (eg; 127.0.0.1)
|
||||
* @param int $port The port to connect to (eg; 4730)
|
||||
* @return bool
|
||||
*/
|
||||
public function addServer(string $host, int $port): bool;
|
||||
|
||||
/**
|
||||
* Adds a list of servers to the list of servers to use
|
||||
*
|
||||
* @param array $servers An array of servers to connect to (eg; ['host:port', 'host:port', ...])
|
||||
* @return void
|
||||
*/
|
||||
public function addServers(array $servers): void;
|
||||
|
||||
/**
|
||||
* Adds options to the worker (worker specific)
|
||||
*
|
||||
* @param array $options
|
||||
* @return bool
|
||||
*/
|
||||
public function addOptions(array $options): bool;
|
||||
|
||||
/**
|
||||
* Returns True if the worker is set to automatically reconnect to the server after a period of time
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isAutomaticReconnect(): bool;
|
||||
|
||||
/**
|
||||
* Enables or disables automatic reconnecting to the server after a period of time
|
||||
*
|
||||
* @param bool $automatic_reconnect
|
||||
* @return void
|
||||
*/
|
||||
public function setAutomaticReconnect(bool $automatic_reconnect): void;
|
||||
|
||||
/**
|
||||
* Registers a function to the worker
|
||||
*
|
||||
* @param string $function_name The name of the function to add
|
||||
* @param callable $function The function to add
|
||||
* @param mixed $context (optional) The context to pass to the function
|
||||
* @return void
|
||||
*/
|
||||
public function addFunction(string $function_name, callable $function, mixed $context=null): void;
|
||||
|
||||
/**
|
||||
* Removes a function from the worker
|
||||
*
|
||||
* @param string $function_name The name of the function to remove
|
||||
* @return void
|
||||
*/
|
||||
public function removeFunction(string $function_name): void;
|
||||
|
||||
/**
|
||||
* Works a job from the queue (blocking or non-blocking)
|
||||
*
|
||||
* @param bool $blocking (optional) Whether to block until a job is available
|
||||
* @param int $timeout (optional) The timeout to use when blocking
|
||||
* @param bool $throw_errors (optional) Whether to throw errors or not
|
||||
* @return void
|
||||
*/
|
||||
public function work(bool $blocking=true, int $timeout=500, bool $throw_errors=false): void;
|
||||
}
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace Tamer\Protocols;
|
||||
namespace Tamer\Protocols\Gearman;
|
||||
|
||||
use Closure;
|
||||
use Exception;
|
||||
|
@ -16,7 +16,7 @@
|
|||
use Tamer\Objects\JobResults;
|
||||
use Tamer\Objects\Task;
|
||||
|
||||
class GearmanClient implements ClientProtocolInterface
|
||||
class Client implements ClientProtocolInterface
|
||||
{
|
||||
/**
|
||||
* @var \GearmanClient|null $client
|
||||
|
@ -46,8 +46,11 @@
|
|||
private $next_reconnect;
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
* @param string|null $username
|
||||
* @param string|null $password
|
||||
*/
|
||||
public function __construct()
|
||||
public function __construct(?string $username=null, ?string $password=null)
|
||||
{
|
||||
$this->client = null;
|
||||
$this->tasks = [];
|
||||
|
@ -145,15 +148,15 @@
|
|||
/**
|
||||
* Executes a closure in the background
|
||||
*
|
||||
* @param Closure $function
|
||||
* @param Closure $closure
|
||||
* @return void
|
||||
* @throws ServerException
|
||||
*/
|
||||
public function closure(Closure $function): void
|
||||
public function doClosure(Closure $closure): void
|
||||
{
|
||||
$closure_task = new Task('tamer_closure', $function);
|
||||
$closure_task = new Task('tamer_closure', $closure);
|
||||
$closure_task->setClosure(true);
|
||||
$this->doBackground($closure_task);
|
||||
$this->do($closure_task);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -163,7 +166,7 @@
|
|||
* @return void
|
||||
* @throws ServerException
|
||||
*/
|
||||
public function doBackground(Task $task): void
|
||||
public function do(Task $task): void
|
||||
{
|
||||
if($this->automatic_reconnect && time() > $this->next_reconnect)
|
||||
{
|
||||
|
@ -199,7 +202,7 @@
|
|||
* @return void
|
||||
* @throws ServerException
|
||||
*/
|
||||
public function addTask(Task $task): void
|
||||
public function queue(Task $task): void
|
||||
{
|
||||
if($this->automatic_reconnect && time() > $this->next_reconnect)
|
||||
{
|
||||
|
@ -230,16 +233,16 @@
|
|||
/**
|
||||
* Adds a closure task to the list of tasks to run
|
||||
*
|
||||
* @param Closure $function
|
||||
* @param Closure $closure
|
||||
* @param $callback
|
||||
* @return void
|
||||
* @throws ServerException
|
||||
*/
|
||||
public function addClosureTask(Closure $function, $callback): void
|
||||
public function queueClosure(Closure $closure, $callback): void
|
||||
{
|
||||
$closure_task = new Task('tamer_closure', $function, $callback);
|
||||
$closure_task = new Task('tamer_closure', $closure, $callback);
|
||||
$closure_task->setClosure(true);
|
||||
$this->addTask($closure_task);
|
||||
$this->queue($closure_task);
|
||||
}
|
||||
|
||||
/**
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace Tamer\Protocols;
|
||||
namespace Tamer\Protocols\Gearman;
|
||||
|
||||
use Exception;
|
||||
use GearmanJob;
|
||||
|
@ -14,7 +14,7 @@
|
|||
use Tamer\Objects\Job;
|
||||
use Tamer\Objects\JobResults;
|
||||
|
||||
class GearmanWorker implements WorkerProtocolInterface
|
||||
class Worker implements WorkerProtocolInterface
|
||||
{
|
||||
/**
|
||||
* @var \GearmanWorker|null
|
||||
|
@ -36,7 +36,7 @@
|
|||
*/
|
||||
private $next_reconnect;
|
||||
|
||||
public function __construct()
|
||||
public function __construct(?string $username=null, ?string $password=null)
|
||||
{
|
||||
$this->worker = null;
|
||||
$this->server_cache = [];
|
||||
|
@ -53,6 +53,25 @@
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array $options
|
||||
* @return bool
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function addOptions(array $options): bool
|
||||
{
|
||||
if($this->worker == null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
$options = array_map(function($option)
|
||||
{
|
||||
return constant($option);
|
||||
}, $options);
|
||||
|
||||
return $this->worker->addOptions(array_sum($options));
|
||||
}
|
||||
/**
|
||||
* Adds a server to the list of servers to use
|
||||
*
|
||||
|
@ -91,18 +110,16 @@
|
|||
*
|
||||
* @link http://php.net/manual/en/gearmanworker.addservers.php
|
||||
* @param string[] $servers (host:port, host:port, ...)
|
||||
* @return WorkerProtocolInterface
|
||||
* @return void
|
||||
* @throws ServerException
|
||||
*/
|
||||
public function addServers(array $servers): self
|
||||
public function addServers(array $servers): void
|
||||
{
|
||||
foreach($servers as $server)
|
||||
{
|
||||
$server = explode(':', $server);
|
||||
$this->addServer($server[0], $server[1]);
|
||||
}
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
|
||||
|
@ -113,9 +130,9 @@
|
|||
* @param string $function_name The name of the function to register with the job server
|
||||
* @param callable $function The callback function to call when the job is received
|
||||
* @param mixed|null $context (optional) The context to pass to the callback function
|
||||
* @return WorkerProtocolInterface
|
||||
* @return void
|
||||
*/
|
||||
public function addFunction(string $function_name, callable $function, mixed $context=null): self
|
||||
public function addFunction(string $function_name, callable $function, mixed $context=null): void
|
||||
{
|
||||
$this->worker->addFunction($function_name, function(GearmanJob $job) use ($function, $context)
|
||||
{
|
||||
|
@ -135,19 +152,17 @@
|
|||
$job->sendComplete(msgpack_pack($job_results->toArray()));
|
||||
|
||||
});
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a function from the list of functions to call
|
||||
*
|
||||
* @param string $function_name The name of the function to unregister
|
||||
* @return WorkerProtocolInterface
|
||||
* @return void
|
||||
*/
|
||||
public function removeFunction(string $function_name): self
|
||||
public function removeFunction(string $function_name): void
|
||||
{
|
||||
$this->worker->unregister($function_name);
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -160,12 +175,11 @@
|
|||
|
||||
/**
|
||||
* @param bool $automatic_reconnect
|
||||
* @return WorkerProtocolInterface
|
||||
* @return void
|
||||
*/
|
||||
public function setAutomaticReconnect(bool $automatic_reconnect): self
|
||||
public function setAutomaticReconnect(bool $automatic_reconnect): void
|
||||
{
|
||||
$this->automatic_reconnect = $automatic_reconnect;
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -211,7 +225,7 @@
|
|||
*
|
||||
* @link http://php.net/manual/en/gearmanworker.work.php
|
||||
* @param bool $blocking (default: true) Whether to block until a job is received
|
||||
* @param int $timeout (default: 500) The timeout in milliseconds
|
||||
* @param int $timeout (default: 500) The timeout in milliseconds (if $blocking is false)
|
||||
* @param bool $throw_errors (default: false) Whether to throw exceptions on errors
|
||||
* @return void Returns nothing
|
||||
* @throws ServerException If the worker cannot connect to the server
|
401
src/Tamer/Protocols/RabbitMq/Client.php
Normal file
401
src/Tamer/Protocols/RabbitMq/Client.php
Normal file
|
@ -0,0 +1,401 @@
|
|||
<?php
|
||||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace Tamer\Protocols\RabbitMq;
|
||||
|
||||
use Closure;
|
||||
use Exception;
|
||||
use PhpAmqpLib\Channel\AMQPChannel;
|
||||
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
||||
use PhpAmqpLib\Message\AMQPMessage;
|
||||
use Tamer\Abstracts\TaskPriority;
|
||||
use Tamer\Exceptions\ServerException;
|
||||
use Tamer\Interfaces\ClientProtocolInterface;
|
||||
use Tamer\Objects\Job;
|
||||
use Tamer\Objects\JobResults;
|
||||
use Tamer\Objects\Task;
|
||||
|
||||
class Client implements ClientProtocolInterface
|
||||
{
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
private $server_cache;
|
||||
|
||||
/**
|
||||
* Used for tracking the current execution of tasks and run callbacks on completion
|
||||
*
|
||||
* @var Task[]
|
||||
*/
|
||||
private $tasks;
|
||||
|
||||
/**
|
||||
* @var bool
|
||||
*/
|
||||
private $automatic_reconnect;
|
||||
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
private $next_reconnect;
|
||||
|
||||
/**
|
||||
* @var string|null
|
||||
*/
|
||||
private $username;
|
||||
|
||||
/**
|
||||
* @var string|null
|
||||
*/
|
||||
private $password;
|
||||
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
private $options;
|
||||
|
||||
/**
|
||||
* @var AMQPStreamConnection|null
|
||||
*/
|
||||
private $connection;
|
||||
|
||||
/**
|
||||
* @var AMQPChannel|null
|
||||
*/
|
||||
private $channel;
|
||||
|
||||
/***
|
||||
* @param string|null $username
|
||||
* @param string|null $password
|
||||
*/
|
||||
public function __construct(?string $username=null, ?string $password=null)
|
||||
{
|
||||
$this->tasks = [];
|
||||
$this->automatic_reconnect = false;
|
||||
$this->next_reconnect = time() + 1800;
|
||||
$this->server_cache = [];
|
||||
$this->options = [];
|
||||
$this->connection = null;
|
||||
$this->username = $username;
|
||||
$this->password = $password;
|
||||
|
||||
try
|
||||
{
|
||||
$this->reconnect();
|
||||
}
|
||||
catch(ServerException $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
}
|
||||
|
||||
public function addOptions(array $options): bool
|
||||
{
|
||||
$this->options = $options;
|
||||
return true;
|
||||
}
|
||||
|
||||
public function addServer(string $host, int $port): bool
|
||||
{
|
||||
if(!isset($this->server_cache[$host]))
|
||||
{
|
||||
$this->server_cache[$host] = [];
|
||||
}
|
||||
|
||||
if(in_array($port, $this->server_cache[$host]))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
$this->server_cache[$host][] = $port;
|
||||
$this->reconnect();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a list of servers to the list of servers to use
|
||||
*
|
||||
* @param array $servers (host:port, host:port, ...)
|
||||
* @return bool
|
||||
*/
|
||||
public function addServers(array $servers): bool
|
||||
{
|
||||
foreach($servers as $server)
|
||||
{
|
||||
$server = explode(':', $server);
|
||||
$this->addServer($server[0], $server[1]);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the priority for a task based on the priority level
|
||||
*
|
||||
* @param int $priority
|
||||
* @return int
|
||||
*/
|
||||
private static function calculatePriority(int $priority): int
|
||||
{
|
||||
if($priority < TaskPriority::Low)
|
||||
return 0;
|
||||
|
||||
if($priority > TaskPriority::High)
|
||||
return 255;
|
||||
|
||||
return (int) round(($priority / TaskPriority::High) * 255);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Task $task
|
||||
* @return void
|
||||
*/
|
||||
public function do(Task $task): void
|
||||
{
|
||||
$job = new Job($task);
|
||||
|
||||
$message = new AMQPMessage(msgpack_pack($job->toArray()), [
|
||||
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
|
||||
'priority' => self::calculatePriority($task->getPriority()),
|
||||
]);
|
||||
|
||||
$this->channel->basic_publish($message, '', 'tamer_queue');
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a closure in the background
|
||||
*
|
||||
* @param Closure $closure
|
||||
* @return void
|
||||
*/
|
||||
public function doClosure(Closure $closure): void
|
||||
{
|
||||
$closure_task = new Task('tamer_closure', $closure);
|
||||
$closure_task->setClosure(true);
|
||||
$this->do($closure_task);
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues a task to be executed
|
||||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
*/
|
||||
public function queue(Task $task): void
|
||||
{
|
||||
$this->tasks[] = $task;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a closure task to the list of tasks to run
|
||||
*
|
||||
* @param Closure $closure
|
||||
* @param $callback
|
||||
* @return void
|
||||
*/
|
||||
public function queueClosure(Closure $closure, $callback): void
|
||||
{
|
||||
$closure_task = new Task('tamer_closure', $closure, $callback);
|
||||
$closure_task->setClosure(true);
|
||||
$this->queue($closure_task);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes all the tasks that has been added
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function run(): bool
|
||||
{
|
||||
if(count($this->tasks) === 0)
|
||||
return false;
|
||||
|
||||
$correlationIds = [];
|
||||
|
||||
/** @var Task $task */
|
||||
foreach($this->tasks as $task)
|
||||
{
|
||||
$correlationIds[] = $task->getId();
|
||||
|
||||
$job = new Job($task);
|
||||
|
||||
$message = new AMQPMessage(msgpack_pack($job->toArray()), [
|
||||
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
|
||||
'correlation_id' => $task->getId(),
|
||||
'reply_to' => 'tamer_queue',
|
||||
'priority' => self::calculatePriority($task->getPriority()),
|
||||
]);
|
||||
|
||||
$this->channel->basic_publish($message, '', 'tamer_queue');
|
||||
}
|
||||
|
||||
// Register callback for each task
|
||||
$callback = function($msg) use (&$correlationIds)
|
||||
{
|
||||
$job_result = JobResults::fromArray(msgpack_unpack($msg->body));
|
||||
$task = $this->getTaskById($job_result->getId());
|
||||
|
||||
try
|
||||
{
|
||||
$task->runCallback($job_result);
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
echo $e->getMessage();
|
||||
}
|
||||
|
||||
// Remove the processed correlation_id
|
||||
$index = array_search($msg->get('correlation_id'), $correlationIds);
|
||||
if ($index !== false) {
|
||||
unset($correlationIds[$index]);
|
||||
}
|
||||
|
||||
$this->channel->basic_ack($msg->delivery_info['delivery_tag']);
|
||||
|
||||
// Stop consuming when all tasks are processed
|
||||
if(count($correlationIds) === 0)
|
||||
{
|
||||
$this->channel->basic_cancel($msg->delivery_info['consumer_tag']);
|
||||
}
|
||||
};
|
||||
|
||||
$this->channel->basic_consume('tamer_queue', '', false, false, false, false, $callback);
|
||||
|
||||
// Start consuming messages
|
||||
while(count($this->channel->callbacks))
|
||||
{
|
||||
$this->channel->wait();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $id
|
||||
* @return Task|null
|
||||
*/
|
||||
private function getTaskById(string $id): ?Task
|
||||
{
|
||||
foreach($this->tasks as $task)
|
||||
{
|
||||
if($task->getId() === $id)
|
||||
{
|
||||
return $task;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns True if the client is automatically reconnecting to the server
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isAutomaticReconnect(): bool
|
||||
{
|
||||
return $this->automatic_reconnect;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enables or disables automatic reconnecting to the server
|
||||
*
|
||||
* @param bool $automatic_reconnect
|
||||
* @return void
|
||||
*/
|
||||
public function setAutomaticReconnect(bool $automatic_reconnect): void
|
||||
{
|
||||
$this->automatic_reconnect = $automatic_reconnect;
|
||||
}
|
||||
|
||||
private function reconnect()
|
||||
{
|
||||
$connections = [];
|
||||
|
||||
if(count($this->server_cache) === 0)
|
||||
return;
|
||||
|
||||
foreach($this->server_cache as $host => $ports)
|
||||
{
|
||||
foreach($ports as $port)
|
||||
{
|
||||
$host = [
|
||||
'host' => $host,
|
||||
'port' => $port
|
||||
];
|
||||
|
||||
if($this->username !== null)
|
||||
$host['username'] = $this->username;
|
||||
|
||||
if($this->password !== null)
|
||||
$host['password'] = $this->password;
|
||||
|
||||
$connections[] = $host;
|
||||
}
|
||||
}
|
||||
|
||||
// Can only connect to one server for now, so we'll just use the first one
|
||||
$selected_connection = $connections[0];
|
||||
$this->disconnect();
|
||||
$this->connection = new AMQPStreamConnection(
|
||||
$selected_connection['host'],
|
||||
$selected_connection['port'],
|
||||
$selected_connection['username'] ?? null,
|
||||
$selected_connection['password'] ?? null
|
||||
);
|
||||
|
||||
$this->channel = $this->connection->channel();
|
||||
$this->channel->queue_declare('tamer_queue', false, true, false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnects from the server
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function disconnect()
|
||||
{
|
||||
try
|
||||
{
|
||||
if(!is_null($this->channel))
|
||||
{
|
||||
$this->channel->close();
|
||||
}
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
$this->channel = null;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if(!is_null($this->connection))
|
||||
{
|
||||
$this->connection->close();
|
||||
}
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
$this->connection = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnects from the server when the object is destroyed
|
||||
*/
|
||||
public function __destruct()
|
||||
{
|
||||
$this->disconnect();
|
||||
}
|
||||
|
||||
}
|
324
src/Tamer/Protocols/RabbitMq/Worker.php
Normal file
324
src/Tamer/Protocols/RabbitMq/Worker.php
Normal file
|
@ -0,0 +1,324 @@
|
|||
<?php
|
||||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace Tamer\Protocols\RabbitMq;
|
||||
|
||||
use Exception;
|
||||
use PhpAmqpLib\Channel\AMQPChannel;
|
||||
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
||||
use PhpAmqpLib\Message\AMQPMessage;
|
||||
use Tamer\Abstracts\JobStatus;
|
||||
use Tamer\Interfaces\WorkerProtocolInterface;
|
||||
use Tamer\Objects\Job;
|
||||
use Tamer\Objects\JobResults;
|
||||
|
||||
class Worker implements WorkerProtocolInterface
|
||||
{
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
private $server_cache;
|
||||
|
||||
/**
|
||||
* @var false
|
||||
*/
|
||||
private $automatic_reconnect;
|
||||
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
private $next_reconnect;
|
||||
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
private $functions;
|
||||
|
||||
/**
|
||||
* @var string|null
|
||||
*/
|
||||
private $username;
|
||||
|
||||
/**
|
||||
* @var string|null
|
||||
*/
|
||||
private $password;
|
||||
|
||||
/**
|
||||
* @var AMQPStreamConnection|null
|
||||
*/
|
||||
private $connection;
|
||||
|
||||
/**
|
||||
* @var AMQPChannel|null
|
||||
*/
|
||||
private $channel;
|
||||
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
private $options;
|
||||
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function __construct(?string $username = null, ?string $password = null)
|
||||
{
|
||||
$this->server_cache = [];
|
||||
$this->functions = [];
|
||||
$this->automatic_reconnect = false;
|
||||
$this->next_reconnect = time() + 1800;
|
||||
$this->username = $username;
|
||||
$this->password = $password;
|
||||
|
||||
try
|
||||
{
|
||||
$this->reconnect();
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function addServer(string $host, int $port): bool
|
||||
{
|
||||
if(!isset($this->server_cache[$host]))
|
||||
{
|
||||
$this->server_cache[$host] = [];
|
||||
}
|
||||
|
||||
if(in_array($port, $this->server_cache[$host]))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
$this->server_cache[$host][] = $port;
|
||||
$this->reconnect();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function addServers(array $servers): void
|
||||
{
|
||||
foreach($servers as $server)
|
||||
{
|
||||
$server = explode(':', $server);
|
||||
$this->addServer($server[0], $server[1]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function addOptions(array $options): bool
|
||||
{
|
||||
$this->options = $options;
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function isAutomaticReconnect(): bool
|
||||
{
|
||||
return $this->automatic_reconnect;
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function setAutomaticReconnect(bool $automatic_reconnect): void
|
||||
{
|
||||
$this->automatic_reconnect = $automatic_reconnect;
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function addFunction(string $function_name, callable $function, mixed $context = null): void
|
||||
{
|
||||
$this->functions[$function_name] = [
|
||||
'function' => $function,
|
||||
'context' => $context
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function removeFunction(string $function_name): void
|
||||
{
|
||||
unset($this->functions[$function_name]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function work(bool $blocking = true, int $timeout = 500, bool $throw_errors = false): void
|
||||
{
|
||||
$callback = function($message) use ($throw_errors)
|
||||
{
|
||||
var_dump($message->body);
|
||||
$job = Job::fromArray(msgpack_unpack($message->body));
|
||||
|
||||
$job_results = new JobResults($job, JobStatus::Success, 'Hello from worker!');
|
||||
|
||||
try
|
||||
{
|
||||
// Return $job_results
|
||||
$this->channel->basic_publish(
|
||||
new AMQPMessage(
|
||||
msgpack_pack($job_results->toArray()),
|
||||
[
|
||||
'correlation_id' => $job->getId()
|
||||
]
|
||||
)
|
||||
);
|
||||
|
||||
$this->channel->basic_ack($message->delivery_info['delivery_tag']);
|
||||
}
|
||||
catch (Exception $e)
|
||||
{
|
||||
if ($throw_errors)
|
||||
{
|
||||
throw $e;
|
||||
}
|
||||
|
||||
$job_results = new JobResults($job, JobStatus::Exception, $e->getMessage());
|
||||
|
||||
// Return $job_results
|
||||
$this->channel->basic_publish(
|
||||
new AMQPMessage(
|
||||
msgpack_pack($job_results->toArray()),
|
||||
[
|
||||
'correlation_id' => $job->getId()
|
||||
]
|
||||
)
|
||||
);
|
||||
|
||||
$this->channel->basic_ack($message->delivery_info['delivery_tag']);
|
||||
}
|
||||
};
|
||||
|
||||
$this->channel->basic_consume('tamer_queue', '', false, false, false, false, $callback);
|
||||
|
||||
if ($blocking)
|
||||
{
|
||||
while(true)
|
||||
{
|
||||
$this->channel->wait();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
$start = microtime(true);
|
||||
while (true)
|
||||
{
|
||||
if (microtime(true) - $start >= $timeout / 1000)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
$this->channel->wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private function reconnect()
|
||||
{
|
||||
$connections = [];
|
||||
|
||||
if(count($this->server_cache) === 0)
|
||||
return;
|
||||
|
||||
foreach($this->server_cache as $host => $ports)
|
||||
{
|
||||
foreach($ports as $port)
|
||||
{
|
||||
$host = [
|
||||
'host' => $host,
|
||||
'port' => $port
|
||||
];
|
||||
|
||||
if($this->username !== null)
|
||||
$host['username'] = $this->username;
|
||||
|
||||
if($this->password !== null)
|
||||
$host['password'] = $this->password;
|
||||
|
||||
$connections[] = $host;
|
||||
}
|
||||
}
|
||||
|
||||
// Can only connect to one server for now, so we'll just use the first one
|
||||
$selected_connection = $connections[0];
|
||||
$this->disconnect();
|
||||
$this->connection = new AMQPStreamConnection(
|
||||
$selected_connection['host'],
|
||||
$selected_connection['port'],
|
||||
$selected_connection['username'] ?? null,
|
||||
$selected_connection['password'] ?? null
|
||||
);
|
||||
|
||||
$this->channel = $this->connection->channel();
|
||||
$this->channel->queue_declare('tamer_queue', false, true, false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnects from the server
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function disconnect()
|
||||
{
|
||||
try
|
||||
{
|
||||
if(!is_null($this->channel))
|
||||
{
|
||||
$this->channel->close();
|
||||
}
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
$this->channel = null;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if(!is_null($this->connection))
|
||||
{
|
||||
$this->connection->close();
|
||||
}
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
$this->connection = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnects from the server when the object is destroyed
|
||||
*/
|
||||
public function __destruct()
|
||||
{
|
||||
$this->disconnect();
|
||||
}
|
||||
|
||||
}
|
|
@ -1,97 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace Tamer\Protocols;
|
||||
|
||||
use Tamer\Exceptions\ServerException;
|
||||
use Tamer\Interfaces\ClientProtocolInterface;
|
||||
use Tamer\Objects\Task;
|
||||
|
||||
class RabbitMqClient implements ClientProtocolInterface
|
||||
{
|
||||
/**
|
||||
* @var \R|null $client
|
||||
*/
|
||||
private $client;
|
||||
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
private $server_cache;
|
||||
|
||||
/**
|
||||
* Used for tracking the current execution of tasks and run callbacks on completion
|
||||
*
|
||||
* @var Task[]
|
||||
*/
|
||||
private $tasks;
|
||||
|
||||
/**
|
||||
* @var bool
|
||||
*/
|
||||
private $automatic_reconnect;
|
||||
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
private $next_reconnect;
|
||||
|
||||
/**
|
||||
*/
|
||||
public function __construct()
|
||||
{
|
||||
$this->client = null;
|
||||
$this->tasks = [];
|
||||
$this->automatic_reconnect = false;
|
||||
$this->next_reconnect = time() + 1800;
|
||||
$this->server_cache = [];
|
||||
|
||||
try
|
||||
{
|
||||
$this->reconnect();
|
||||
}
|
||||
catch(ServerException $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
}
|
||||
|
||||
public function addOptions(array $options): bool
|
||||
{
|
||||
// TODO: Implement addOptions() method.
|
||||
}
|
||||
|
||||
public function addServer(string $host, int $port): bool
|
||||
{
|
||||
// TODO: Implement addServer() method.
|
||||
}
|
||||
|
||||
public function addServers(array $servers): bool
|
||||
{
|
||||
// TODO: Implement addServers() method.
|
||||
}
|
||||
|
||||
public function doBackground(Task $task): void
|
||||
{
|
||||
// TODO: Implement doBackground() method.
|
||||
}
|
||||
|
||||
public function addTask(Task $task): void
|
||||
{
|
||||
// TODO: Implement addTask() method.
|
||||
}
|
||||
|
||||
public function run(): bool
|
||||
{
|
||||
// TODO: Implement run() method.
|
||||
}
|
||||
|
||||
public function isAutomaticReconnect(): bool
|
||||
{
|
||||
// TODO: Implement isAutomaticReconnect() method.
|
||||
}
|
||||
|
||||
public function setAutomaticReconnect(bool $automatic_reconnect): void
|
||||
{
|
||||
// TODO: Implement setAutomaticReconnect() method.
|
||||
}
|
||||
}
|
|
@ -2,23 +2,23 @@
|
|||
|
||||
require 'ncc';
|
||||
|
||||
use Tamer\Objects\JobResults;
|
||||
use Tamer\Objects\Task;
|
||||
use Tamer\Objects\JobResults;
|
||||
use Tamer\Objects\Task;
|
||||
|
||||
import('net.nosial.tamerlib', 'latest');
|
||||
import('net.nosial.tamerlib', 'latest');
|
||||
|
||||
$client = new \Tamer\Protocols\GearmanClient();
|
||||
$client = new \Tamer\Protocols\Gearman\Client();
|
||||
$client->addServer();
|
||||
|
||||
$client->doBackground(new Task('sleep', '5'));
|
||||
$client->do(new Task('sleep', '5'));
|
||||
|
||||
|
||||
$client->addTask(new Task('sleep', '5', function(JobResults $job) {
|
||||
$client->queue(new Task('sleep', '5', function(JobResults $job) {
|
||||
echo "Task {$job->getId()} completed with data: {$job->getData()} \n";
|
||||
}));
|
||||
|
||||
|
||||
$client->addTask(new Task('sleep', '5', function(JobResults $job) {
|
||||
$client->queue(new Task('sleep', '5', function(JobResults $job) {
|
||||
echo "Task {$job->getId()} completed with data: {$job->getData()} \n";
|
||||
}));
|
||||
|
||||
|
|
|
@ -2,14 +2,14 @@
|
|||
|
||||
require 'ncc';
|
||||
|
||||
use Tamer\Objects\JobResults;
|
||||
use Tamer\Objects\Task;
|
||||
import('net.nosial.tamerlib', 'latest');
|
||||
|
||||
import('net.nosial.tamerlib', 'latest');
|
||||
|
||||
$client = new \Tamer\Protocols\GearmanClient();
|
||||
$client = new \Tamer\Protocols\Gearman\Client();
|
||||
$client->addServer();
|
||||
|
||||
$client->closure(function () {
|
||||
echo "This function was sent from a client, it should be executed on the worker";
|
||||
$client->doClosure(function () {
|
||||
require 'ncc';
|
||||
import('net.nosial.loglib', 'latest');
|
||||
|
||||
\LogLib\Log::info('gearman_closure.php', 'closure');
|
||||
});
|
|
@ -2,10 +2,10 @@
|
|||
|
||||
require 'ncc';
|
||||
|
||||
use Tamer\Objects\Job;
|
||||
use Tamer\Objects\Job;
|
||||
|
||||
import('net.nosial.tamerlib', 'latest');
|
||||
$worker = new \Tamer\Protocols\GearmanWorker();
|
||||
import('net.nosial.tamerlib', 'latest');
|
||||
$worker = new \Tamer\Protocols\Gearman\Worker();
|
||||
$worker->addServer();
|
||||
|
||||
$worker->addFunction('sleep', function($job) {
|
||||
|
|
18
tests/rabbitmq_client.php
Normal file
18
tests/rabbitmq_client.php
Normal file
|
@ -0,0 +1,18 @@
|
|||
<?php
|
||||
|
||||
|
||||
use Tamer\Objects\Task;
|
||||
|
||||
require 'ncc';
|
||||
|
||||
import('net.nosial.tamerlib', 'latest');
|
||||
|
||||
$client = new \Tamer\Protocols\RabbitMq\Client('guest', 'guest');
|
||||
$client->addServer('127.0.0.1', 5672);
|
||||
|
||||
// Loop through 10 tasks
|
||||
|
||||
for($i = 0; $i < 500; $i++)
|
||||
{
|
||||
$client->do(new Task('sleep', '5'));
|
||||
}
|
27
tests/rabbitmq_worker.php
Normal file
27
tests/rabbitmq_worker.php
Normal file
|
@ -0,0 +1,27 @@
|
|||
<?php
|
||||
|
||||
require 'ncc';
|
||||
|
||||
use Tamer\Objects\Job;
|
||||
|
||||
import('net.nosial.tamerlib', 'latest');
|
||||
$worker = new \Tamer\Protocols\RabbitMq\Worker('guest', 'guest');
|
||||
$worker->addServer('127.0.0.1', 5672);
|
||||
|
||||
$worker->addFunction('sleep', function($job) {
|
||||
/** @var Job $job */
|
||||
var_dump(get_class($job));
|
||||
echo "Task {$job->getId()} started with data: {$job->getData()} \n";
|
||||
sleep($job->getData());
|
||||
echo "Task {$job->getId()} completed with data: {$job->getData()} \n";
|
||||
|
||||
return $job->getData();
|
||||
});
|
||||
|
||||
|
||||
|
||||
while(true)
|
||||
{
|
||||
echo "Waiting for job... \n";
|
||||
$worker->work();
|
||||
}
|
Loading…
Add table
Reference in a new issue