Refactored \Tamer to \TamerLib

This commit is contained in:
Netkas 2023-02-05 17:27:32 -05:00
parent 1b8d2fb40a
commit 7eea383ce9
25 changed files with 81 additions and 81 deletions

View file

@ -0,0 +1,16 @@
<?php
namespace TamerLib\Abstracts\ExitCodes;
class WorkerExitCodes
{
const GracefulShutdown = 0;
const Exception = 1;
const UnsupervisedWorker = 2;
const ProtocolUnavailable = 3;
const ServerConnectionFailed = 4;
}

View file

@ -0,0 +1,12 @@
<?php
namespace TamerLib\Abstracts;
abstract class JobStatus
{
const Success = 0;
const Failure = 1;
const Exception = 2;
}

View file

@ -0,0 +1,10 @@
<?php
namespace TamerLib\Abstracts;
abstract class Mode
{
const Client = 'client';
const Worker = 'worker';
}

View file

@ -0,0 +1,10 @@
<?php
namespace TamerLib\Abstracts;
abstract class ProtocolType
{
const Gearman = 'gearman';
const RabbitMQ = 'rabbitmq';
}

View file

@ -0,0 +1,12 @@
<?php
namespace TamerLib\Abstracts;
abstract class TaskPriority
{
const Low = 0;
const Normal = 1;
const High = 2;
}

View file

@ -0,0 +1,130 @@
<?php
namespace TamerLib\Classes;
use Exception;
use InvalidArgumentException;
use OptsLib\Parse;
use Symfony\Component\Process\PhpExecutableFinder;
use TamerLib\Abstracts\ProtocolType;
use TamerLib\Interfaces\ClientProtocolInterface;
use TamerLib\Interfaces\WorkerProtocolInterface;
class Functions
{
/**
* A cache of the worker variables
*
* @var array|null
*/
private static $worker_variables;
/**
* A cache of the php binary path
*
* @var string|null
*/
private static $php_bin;
/**
* Attempts to get the worker id from the command line arguments or the environment variable TAMER_WORKER_ID
* If neither are set, returns null.
*
* @return string|null
*/
public static function getWorkerId(): ?string
{
$options = Parse::getArguments();
$worker_id = ($options['worker-id'] ?? null);
if($worker_id !== null)
return $worker_id;
$worker_id = getenv('TAMER_WORKER_ID');
if($worker_id !== false)
return $worker_id;
return null;
}
/**
* Constructs a client protocol object based on the protocol type
*
* @param string $protocol
* @param string|null $username
* @param string|null $password
* @return ClientProtocolInterface
*/
public static function createClient(string $protocol, ?string $username=null, ?string $password=null): ClientProtocolInterface
{
/** @noinspection PhpFullyQualifiedNameUsageInspection */
return match (strtolower($protocol))
{
ProtocolType::Gearman => new \TamerLib\Protocols\Gearman\Client($username, $password),
ProtocolType::RabbitMQ => new \TamerLib\Protocols\RabbitMq\Client($username, $password),
default => throw new InvalidArgumentException('Invalid protocol type'),
};
}
/**
* @param string $protocol
* @param string|null $username
* @param string|null $password
* @return WorkerProtocolInterface
*/
public static function createWorker(string $protocol, ?string $username=null, ?string $password=null): WorkerProtocolInterface
{
/** @noinspection PhpFullyQualifiedNameUsageInspection */
return match (strtolower($protocol))
{
ProtocolType::Gearman => new \TamerLib\Protocols\Gearman\Worker($username, $password),
ProtocolType::RabbitMQ => new \TamerLib\Protocols\RabbitMq\Worker($username, $password),
default => throw new InvalidArgumentException('Invalid protocol type'),
};
}
/**
* Returns the worker variables from the environment variables
*
* @return array
*/
public static function getWorkerVariables(): array
{
if(self::$worker_variables == null)
{
self::$worker_variables = [
'TAMER_ENABLED' => getenv('TAMER_ENABLED') === 'true',
'TAMER_PROTOCOL' => getenv('TAMER_PROTOCOL'),
'TAMER_SERVERS' => getenv('TAMER_SERVERS'),
'TAMER_USERNAME' => getenv('TAMER_USERNAME'),
'TAMER_PASSWORD' => getenv('TAMER_PASSWORD'),
'TAMER_INSTANCE_ID' => getenv('TAMER_INSTANCE_ID'),
];
if(self::$worker_variables['TAMER_SERVERS'] !== false)
self::$worker_variables['TAMER_SERVERS'] = explode(',', self::$worker_variables['TAMER_SERVERS']);
}
return self::$worker_variables;
}
/**
* Returns the path to the php binary
*
* @return string
* @throws Exception
*/
public static function findPhpBin(): string
{
if(self::$php_bin !== null)
return self::$php_bin;
$php_finder = new PhpExecutableFinder();
$php_bin = $php_finder->find();
if($php_bin === false)
throw new Exception('Unable to find the php binary');
self::$php_bin = $php_bin;
return $php_bin;
}
}

View file

@ -0,0 +1,187 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib\Classes;
use Exception;
use LogLib\Log;
use Symfony\Component\Process\Process;
use TamerLib\Objects\WorkerInstance;
class Supervisor
{
/**
* A list of all the workers that are initialized
*
* @var WorkerInstance[]
*/
private $workers;
/**
* The protocol to pass to the worker instances
*
* @var string
*/
private $protocol;
/**
* The list of servers to pass to the worker instances (eg; host:port)
*
* @var string[]
*/
private $servers;
/**
* (Optional) The username to pass to the worker instances
*
* @var string|null
*/
private $username;
/**
* (Optional) The password to pass to the worker instances
*
* @var string|null
*/
private $password;
/**
*
*/
public function __construct(string $protocol, array $servers, ?string $username = null, ?string $password = null)
{
$this->workers = [];
$this->protocol = $protocol;
$this->servers = $servers;
$this->username = $username;
$this->password = $password;
}
/**
* Adds a worker to the supervisor instance
*
* @param string $target
* @param int $instances
* @return void
* @throws Exception
*/
public function addWorker(string $target, int $instances): void
{
for ($i = 0; $i < $instances; $i++)
{
$this->workers[] = new WorkerInstance($target, $this->protocol, $this->servers, $this->username, $this->password);
}
}
/**
* Starts all the workers
*
* @return void
* @throws Exception
*/
public function start(): void
{
/** @var WorkerInstance $worker */
foreach ($this->workers as $worker)
{
$worker->start();
}
// Ensure that all the workers are running
foreach($this->workers as $worker)
{
if (!$worker->isRunning())
{
throw new Exception("Worker {$worker->getId()} is not running");
}
while(true)
{
switch($worker->getProcess()->getStatus())
{
case Process::STATUS_STARTED:
Log::debug('net.nosial.tamerlib', "worker {$worker->getId()} is running");
break 2;
case Process::STATUS_TERMINATED:
throw new Exception("Worker {$worker->getId()} has terminated");
default:
echo "Worker {$worker->getId()} is {$worker->getProcess()->getStatus()}" . PHP_EOL;
}
}
}
}
/**
* Stops all the workers
*
* @return void
* @throws Exception
*/
public function stop(): void
{
/** @var WorkerInstance $worker */
foreach ($this->workers as $worker)
{
$worker->stop();
}
}
/**
* Restarts all the workers
*
* @return void
* @throws Exception
*/
public function restart(): void
{
/** @var WorkerInstance $worker */
foreach ($this->workers as $worker)
{
$worker->stop();
$worker->start();
}
}
/**
* Monitors all the workers and restarts them if they are not running
*
* @param bool $auto_restart
* @return void
* @throws Exception
*/
public function monitor(bool $auto_restart = true): void
{
while (true)
{
/** @var WorkerInstance $worker */
foreach ($this->workers as $worker)
{
if (!$worker->isRunning())
{
if ($auto_restart)
{
$worker->start();
}
else
{
throw new Exception("Worker {$worker->getId()} is not running");
}
}
}
sleep(1);
}
}
/**
* @throws Exception
*/
public function __destruct()
{
$this->stop();
}
}

View file

@ -0,0 +1,53 @@
<?php
namespace TamerLib\Classes;
use TamerLib\Abstracts\Mode;
use TamerLib\Abstracts\ProtocolType;
use TamerLib\Abstracts\TaskPriority;
class Validate
{
/**
* Returns true if the input is a valid protocol type.
*
* @param string $input
* @return bool
*/
public static function protocolType(string $input): bool
{
return match (strtolower($input))
{
ProtocolType::Gearman, ProtocolType::RabbitMQ => true,
default => false,
};
}
/**
* @param string $input
* @return bool
*/
public static function mode(string $input): bool
{
return match (strtolower($input))
{
Mode::Client, Mode::Worker => true,
default => false,
};
}
/**
* Returns true if the input is a valid task priority.
*
* @param int $input
* @return bool
*/
public static function taskPriority(int $input): bool
{
return match ($input)
{
TaskPriority::Low, TaskPriority::Normal, TaskPriority::High => true,
default => false,
};
}
}

View file

@ -0,0 +1,19 @@
<?php
namespace TamerLib\Exceptions;
use Exception;
use Throwable;
class ConnectionException extends Exception
{
/**
* @param string $message
* @param int $code
* @param Throwable|null $previous
*/
public function __construct(string $message="", int $code=0, Throwable $previous=null)
{
parent::__construct($message, $code, $previous);
}
}

View file

@ -0,0 +1,19 @@
<?php
namespace TamerLib\Exceptions;
use Exception;
use Throwable;
class UnsupervisedWorkerException extends Exception
{
/**
* @param string $message
* @param int $code
* @param Throwable|null $previous
*/
public function __construct(string $message = "", int $code = 0, ?Throwable $previous = null)
{
parent::__construct($message, $code, $previous);
}
}

View file

@ -0,0 +1,142 @@
<?php
namespace TamerLib\Interfaces;
use Closure;
use TamerLib\Exceptions\ConnectionException;
use TamerLib\Objects\Task;
interface ClientProtocolInterface
{
/**
* Public Constructor with optional username and password
*
* @param string|null $username (optional) The username to use when connecting to the server (if required)
* @param string|null $password (optional) The password to use when connecting to the server
*/
public function __construct(?string $username=null, ?string $password=null);
/**
* Adds a server to the list of servers to use
*
* @param string $host The host to connect to (eg; 127.0.0.1)
* @param int $port The port to connect to (eg; 4730)
* @return void
*/
public function addServer(string $host, int $port): void;
/**
* Adds a list of servers to the list of servers to use
*
* @param array $servers An array of servers to connect to (eg; ['host:port', 'host:port', ...])
* @return void
*/
public function addServers(array $servers): void;
/**
* Connects to all the configured servers
*
* @throws ConnectionException
* @return void
*/
public function connect(): void;
/**
* Disconnects from all the configured servers
*
* @return void
*/
public function disconnect(): void;
/**
* Reconnects to all the configured servers
*
* @throws ConnectionException
* @return void
*/
public function reconnect(): void;
/**
* Returns True if the client is connected to the server (or servers)
*
* @return bool
*/
public function isConnected(): bool;
/**
* Sets options to the client (client specific)
*
* @param array $options
* @return void
*/
public function setOptions(array $options): void;
/**
* Returns the options set on the client
*
* @return array
*/
public function getOptions(): array;
/**
* Clears all options from the client
*
* @return void
*/
public function clearOptions(): void;
/**
* Returns True if the client is set to automatically reconnect to the server after a period of time
*
* @return bool
*/
public function automaticReconnectionEnabled(): bool;
/**
* Enables or disables automatic reconnecting to the server after a period of time
*
* @param bool $enable
* @return void
*/
public function enableAutomaticReconnection(bool $enable): void;
/**
* Processes a task in the background (does not return a result)
*
* @param Task $task The task to process
* @return void
*/
public function do(Task $task): void;
/**
* Executes a closure operation in the background (does not return a result)
*
* @param Closure $closure The closure operation to perform (remote)
* @return void
*/
public function doClosure(Closure $closure): void;
/**
* Queues a task to be processed in parallel (returns a result handled by a callback)
*
* @param Task $task
* @return void
*/
public function queue(Task $task): void;
/**
* Queues a closure to be processed in parallel (returns a result handled by a callback)
*
* @param Closure $closure The closure operation to perform (remote)
* @param Closure|null $callback The closure to call when the operation is complete (local)
* @return void
*/
public function queueClosure(Closure $closure, ?Closure $callback=null): void;
/**
* Executes all tasks in the queue and waits for them to complete
*
* @return bool
*/
public function run(): bool;
}

View file

@ -0,0 +1,127 @@
<?php
namespace TamerLib\Interfaces;
use TamerLib\Exceptions\ConnectionException;
interface WorkerProtocolInterface
{
/**
* Public Constructor with optional username and password
*
* @param string|null $username (optional) The username to use when connecting to the server (if required)
* @param string|null $password (optional) The password to use when connecting to the server
*/
public function __construct(?string $username=null, ?string $password=null);
/**
* Adds a server to the list of servers to use
*
* @param string $host The host to connect to (eg; 127.0.0.1)
* @param int $port The port to connect to (eg; 4730)
* @return void
*/
public function addServer(string $host, int $port): void;
/**
* Adds a list of servers to the list of servers to use
*
* @param array $servers An array of servers to connect to (eg; ['host:port', 'host:port', ...])
* @return void
*/
public function addServers(array $servers): void;
/**
* Connects to all the configured servers
*
* @throws ConnectionException
* @return void
*/
public function connect(): void;
/**
* Disconnects from all the configured servers
*
* @return void
*/
public function disconnect(): void;
/**
* Reconnects to all the configured servers
*
* @throws ConnectionException
* @return void
*/
public function reconnect(): void;
/**
* Returns True if the client is connected to the server (or servers)
*
* @return bool
*/
public function isConnected(): bool;
/**
* Sets options to the worker (worker specific)
*
* @param array $options
* @return void
*/
public function setOptions(array $options): void;
/**
* Returns the options set on the worker
*
* @return array
*/
public function getOptions(): array;
/**
* Clears all options from the worker
*
* @return void
*/
public function clearOptions(): void;
/**
* Returns True if the worker is set to automatically reconnect to the server after a period of time
*
* @return bool
*/
public function automaticReconnectionEnabled(): bool;
/**
* Enables or disables automatic reconnecting to the server after a period of time
*
* @param bool $enable
* @return void
*/
public function enableAutomaticReconnection(bool $enable): void;
/**
* Registers a function to the worker
*
* @param string $name The name of the function to add
* @param callable $callable The function to add
* @return void
*/
public function addFunction(string $name, callable $callable): void;
/**
* Removes a function from the worker
*
* @param string $function_name The name of the function to remove
* @return void
*/
public function removeFunction(string $function_name): void;
/**
* Works a job from the queue (blocking or non-blocking)
*
* @param bool $blocking (optional) Whether to block until a job is available
* @param int $timeout (optional) The timeout to use when blocking
* @param bool $throw_errors (optional) Whether to throw errors or not
* @return void
*/
public function work(bool $blocking=true, int $timeout=500, bool $throw_errors=false): void;
}

View file

@ -0,0 +1,126 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib\Objects;
use Closure;
use Opis\Closure\SerializableClosure;
use function unserialize;
class Job
{
/**
* The ID of the job
*
* @var string
*/
private $id;
/**
* The name of the function
*
* @var string
*/
private $name;
/**
* The data to be passed to the function
*
* @var string|Closure|null
*/
private $data;
/**
* Indicates if the data is a closure
*
* @var bool
*/
private $closure;
public function __construct(Task $task)
{
$this->id = $task->getId();
$this->name = $task->getFunctionName();
$this->data = $task->getData();
$this->closure = $task->isClosure();
}
/**
* Returns the ID of the Job
*
* @return string
*/
public function getId(): string
{
return $this->id;
}
/**
* Returns the function name of the Job
*
* @return string
*/
public function getName(): string
{
return $this->name;
}
/**
* Returns the data of the Job
*
* @return string|Closure|null
*/
public function getData(): Closure|string|null
{
return $this->data;
}
/**
* @return bool
*/
public function isClosure(): bool
{
return $this->closure;
}
/**
* Returns an array representation of the Job
*
* @return array
*/
public function toArray(): array
{
return [
'id' => $this->id,
'name' => $this->name,
'data' => ($this->closure ? serialize(new SerializableClosure($this->data)) : $this->data),
'closure' => $this->closure
];
}
/**
* Constructs a Job from an array
*
* @param array $data
* @return Job
*/
public static function fromArray(array $data): Job
{
$job_data = $data['data'];
if($data['closure'] === true)
{
/** @var SerializableClosure $job_data */
$job_data = unserialize($data['data']);
$job_data = $job_data->getClosure();
}
$job = new Job(new Task($data['name'], $job_data));
$job->id = $data['id'];
$job->closure = $data['closure'];
return $job;
}
}

View file

@ -0,0 +1,129 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib\Objects;
use TamerLib\Abstracts\JobStatus;
class JobResults
{
/**
* The ID of the job
*
* @var string
*/
private $id;
/**
* The data to be passed to the function
*
* @var string
*/
private $data;
/**
* The status of the job
*
* @var int
* @see JobStatus
*/
private $status;
public function __construct(?Job $job=null, ?int $status=null, $results=null)
{
if($job !== null)
{
$this->id = $job->getId();
$this->data = $results;
$this->status = $status;
}
}
/**
* Returns the ID of the Job
*
* @return string
*/
public function getId(): string
{
return $this->id;
}
/**
* Returns the data of the Job
*
* @return string
*/
public function getData(): string
{
return $this->data;
}
/**
* @return int
* @noinspection PhpUnused
*/
public function getStatus(): int
{
return $this->status;
}
/**
* Returns an array representation of the Job
*
* @return array
*/
public function toArray(): array
{
return [
'id' => $this->id,
'data' => $this->data,
'status' => $this->status
];
}
/**
* Constructs a Job from an array
*
* @param array $data
* @return JobResults
*/
public static function fromArray(array $data): JobResults
{
$job = new JobResults();
$job->setId($data['id']);
$job->setData($data['data']);
$job->setStatus($data['status']);
return $job;
}
/**
* @param string $id
*/
protected function setId(string $id): void
{
$this->id = $id;
}
/**
* @param string $data
*/
protected function setData(string $data): void
{
$this->data = $data;
}
/**
* @param int|null $status
*/
protected function setStatus(?int $status): void
{
$this->status = $status;
}
}

View file

@ -0,0 +1,192 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib\Objects;
use Closure;
use InvalidArgumentException;
use TamerLib\Abstracts\TaskPriority;
use TamerLib\Classes\Validate;
class Task
{
/**
* @var string
*/
private $id;
/**
* @var string
*/
private $function_name;
/**
* @var string|Closure|null
*/
private $data;
/**
* @var int
*/
private $priority;
/**
* @var Closure|null
*/
private $callback;
/**
* @var bool
*/
private $closure;
/**
* Public Constructor
*
* @param string $function_name
* @param string|Closure|null $data
* @param Closure|null $callback
*/
public function __construct(string $function_name, string|Closure|null $data, Closure $callback=null)
{
$this->function_name = $function_name;
$this->data = $data;
$this->id = uniqid();
$this->priority = TaskPriority::Normal;
$this->callback = $callback;
$this->closure = false;
}
/**
* Static Constructor
*
* @param string $function_name
* @param string|Closure|null $data
* @param callable|null $callback
* @return static
*/
public static function create(string $function_name, string|Closure|null $data, callable $callback=null): self
{
return new self($function_name, $data, $callback);
}
/**
* Returns the function name for the task
*
* @return string
*/
public function getFunctionName(): string
{
return $this->function_name;
}
/**
* Sets the function name for the task
*
* @param string $function_name
* @return Task
*/
public function setFunctionName(string $function_name): self
{
$this->function_name = $function_name;
return $this;
}
/**
* Returns the arguments for the task
*
* @return string|Closure|null
*/
public function getData(): string|null|Closure
{
return $this->data;
}
/**
* Sets the arguments for the task
*
* @param string $data
* @return Task
*/
public function setData(string $data): self
{
$this->data = $data;
return $this;
}
/**
* Returns the Unique ID of the task
*
* @return string
*/
public function getId(): string
{
return $this->id;
}
/**
* @return int
*/
public function getPriority(): int
{
return $this->priority;
}
/**
* @param int $priority
* @return Task
*/
public function setPriority(int $priority): self
{
if(!Validate::taskPriority($priority))
{
throw new InvalidArgumentException("Invalid priority value");
}
$this->priority = $priority;
return $this;
}
/**
* @param Closure|null $callback
* @return Task
*/
public function setCallback(?Closure $callback): self
{
$this->callback = $callback;
return $this;
}
/**
* Executes the callback function
*
* @param string|JobResults|null $result
* @return void
*/
public function runCallback(string|JobResults|null $result): void
{
if($this->callback !== null)
{
call_user_func($this->callback, $result);
}
}
/**
* @return bool
*/
public function isClosure(): bool
{
return $this->closure;
}
/**
* @param bool $closure
* @return Task
*/
public function setClosure(bool $closure): self
{
$this->closure = $closure;
return $this;
}
}

View file

@ -0,0 +1,186 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib\Objects;
use Exception;
use LogLib\Log;
use Symfony\Component\Process\Process;
use TamerLib\Classes\Functions;
class WorkerInstance
{
/**
* The worker's instance id
*
* @var string
*/
private $id;
/**
* The protocol to use when connecting to the server
*
* @var string
*/
private $protocol;
/**
* The servers to connect to
*
* @var array
*/
private $servers;
/**
* The username to use when connecting to the server (if applicable)
*
* @var string|null
*/
private $username;
/**
* The password to use when connecting to the server (if applicable)
*
* @var string|null
*/
private $password;
/**
* The process that is running the worker instance
*
* @var Process|null
*/
private $process;
/**
* The target to run the worker instance on (e.g. a file path)
*
* @var string
*/
private $target;
/**
* Public Constructor
*
* @param string $target
* @param string $protocol
* @param array $servers
* @param string|null $username
* @param string|null $password
* @throws Exception
*/
public function __construct(string $target, string $protocol, array $servers, ?string $username = null, ?string $password = null)
{
$this->id = uniqid();
$this->target = $target;
$this->protocol = $protocol;
$this->servers = $servers;
$this->username = $username;
$this->password = $password;
$this->process = null;
if($target !== 'closure' && file_exists($target) === false)
{
throw new Exception('The target file does not exist');
}
}
/**
* Returns the worker instance id
*
* @return string
*/
public function getId(): string
{
return $this->id;
}
/**
* Executes the worker instance in a separate process
*
* @return void
* @throws Exception
*/
public function start(): void
{
$target = $this->target;
if($target == 'closure')
{
$target = __DIR__ . DIRECTORY_SEPARATOR . 'closure';
}
$argv = $_SERVER['argv'];
array_shift($argv);
$this->process = new Process(array_merge([Functions::findPhpBin(), $target], $argv));
$this->process->setEnv([
'TAMER_ENABLED' => 'true',
'TAMER_PROTOCOL' => $this->protocol,
'TAMER_SERVERS' => implode(',', $this->servers),
'TAMER_USERNAME' => $this->username,
'TAMER_PASSWORD' => $this->password,
'TAMER_INSTANCE_ID' => $this->id
]);
Log::debug('net.nosial.tamerlib', sprintf('starting worker %s', $this->id));
// Callback for process output
$this->process->start(function ($type, $buffer)
{
// Add newline if it's missing
if(substr($buffer, -1) !== PHP_EOL)
{
$buffer .= PHP_EOL;
}
print($buffer);
});
}
/**
* Stops the worker instance
*
* @return void
*/
public function stop(): void
{
if($this->process !== null)
{
Log::debug('net.nosial.tamerlib', sprintf('Stopping worker %s', $this->id));
$this->process->stop();
}
}
/**
* Returns whether the worker instance is running
*
* @return bool
*/
public function isRunning(): bool
{
if($this->process !== null)
{
return $this->process->isRunning();
}
return false;
}
/**
* @return Process|null
*/
public function getProcess(): ?Process
{
return $this->process;
}
/**
* Destructor
*/
public function __destruct()
{
$this->stop();
}
}

View file

@ -0,0 +1,16 @@
<?php
require 'ncc';
import('net.nosial.tamerlib', 'latest');
\TamerLib\Tamer::initWorker();
try
{
\TamerLib\Tamer::work();
}
catch(\Exception $e)
{
\LogLib\Log::error('net.nosial.tamerlib', $e->getMessage(), $e);
exit(1);
}

View file

@ -0,0 +1,464 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib\Protocols\Gearman;
use Closure;
use Exception;
use GearmanClient;
use GearmanTask;
use LogLib\Log;
use TamerLib\Abstracts\TaskPriority;
use TamerLib\Exceptions\ConnectionException;
use TamerLib\Interfaces\ClientProtocolInterface;
use TamerLib\Objects\Job;
use TamerLib\Objects\JobResults;
use TamerLib\Objects\Task;
class Client implements ClientProtocolInterface
{
/**
* The Gearman Client object
*
* @var GearmanClient|null $client
*/
private $client;
/**
* An array of servers that have been defined
*
* @var array
*/
private $defined_servers;
/**
* Used for tracking the current execution of tasks and run callbacks on completion
*
* @var Task[]
*/
private $tasks;
/**
* Indicates if the client should automatically reconnect to the server if the connection is lost
* (default: true)
*
* @var bool
*/
private $automatic_reconnect;
/**
* The Unix timestamp of the next time the client should attempt to reconnect to the server
*
* @var int
*/
private $next_reconnect;
/**
* The options to use when connecting to the server
*
* @var array
*/
private $options;
/**
* @inheritDoc
* @param string|null $username
* @param string|null $password
*/
public function __construct(?string $username=null, ?string $password=null)
{
$this->client = null;
$this->tasks = [];
$this->automatic_reconnect = false;
$this->next_reconnect = time() + 1800;
$this->defined_servers = [];
$this->options = [];
}
/**
* Adds a server to the list of servers to use
*
* @link http://php.net/manual/en/gearmanclient.addserver.php
* @param string $host (127.0.0.1)
* @param int $port (default: 4730)
* @return void
*/
public function addServer(string $host, int $port): void
{
if(!isset($this->defined_servers[$host]))
{
$this->defined_servers[$host] = [];
}
if(in_array($port, $this->defined_servers[$host]))
{
return;
}
$this->defined_servers[$host][] = $port;
}
/**
* Adds a list of servers to the list of servers to use
*
* @link http://php.net/manual/en/gearmanclient.addservers.php
* @param array $servers (host:port, host:port, ...)
* @return void
*/
public function addServers(array $servers): void
{
foreach($servers as $server)
{
$server = explode(':', $server);
$this->addServer($server[0], (int)$server[1]);
}
}
/**
* Connects to the server(s)
*
* @return void
* @throws ConnectionException
*/
public function connect(): void
{
if($this->isConnected())
return;
$this->client = new GearmanClient();
// Parse $options combination via bitwise OR operator
$options = array_reduce($this->options, function($carry, $item)
{
return $carry | $item;
});
$this->client->addOptions($options);
foreach($this->defined_servers as $host => $ports)
{
foreach($ports as $port)
{
try
{
$this->client->addServer($host, $port);
Log::debug('net.nosial.tamerlib', 'connected to gearman server: ' . $host . ':' . $port);
}
catch(Exception $e)
{
throw new ConnectionException('Failed to connect to Gearman server: ' . $host . ':' . $port, 0, $e);
}
}
}
$this->client->setCompleteCallback([$this, 'callbackHandler']);
$this->client->setFailCallback([$this, 'callbackHandler']);
$this->client->setDataCallback([$this, 'callbackHandler']);
$this->client->setStatusCallback([$this, 'callbackHandler']);
}
/**
* Disconnects from the server(s)
*
* @return void
*/
public function disconnect(): void
{
if(!$this->isConnected())
return;
Log::debug('net.nosial.tamerlib', 'disconnecting from gearman server(s)');
$this->client->clearCallbacks();
unset($this->client);
$this->client = null;
}
/**
* Reconnects to the server(s)
*
* @return void
* @throws ConnectionException
*/
public function reconnect(): void
{
Log::debug('net.nosial.tamerlib', 'reconnecting to gearman server(s)');
$this->disconnect();
$this->connect();
}
/**
* Returns the current status of the client
*
* @inheritDoc
* @return bool
*/
public function isConnected(): bool
{
if($this->client === null)
{
return false;
}
return true;
}
/**
* The automatic reconnect process
*
* @return void
*/
private function preformAutoreconf(): void
{
if($this->automatic_reconnect && $this->next_reconnect < time())
{
try
{
$this->reconnect();
}
catch (Exception $e)
{
Log::error('net.nosial.tamerlib', 'Failed to reconnect to Gearman server: ' . $e->getMessage());
}
finally
{
$this->next_reconnect = time() + 1800;
}
}
}
/**
* Adds client options
*
* @link http://php.net/manual/en/gearmanclient.addoptions.php
* @param int[] $options (GEARMAN_CLIENT_NON_BLOCKING, GEARMAN_CLIENT_UNBUFFERED_RESULT, GEARMAN_CLIENT_FREE_TASKS)
* @return void
*/
public function setOptions(array $options): void
{
$this->options = $options;
}
/**
* Returns the current client options
*
* @return array
*/
public function getOptions(): array
{
return $this->options;
}
/**
* Clears the current client options
*
* @return void
*/
public function clearOptions(): void
{
$this->options = [];
}
/**
* Executes a closure in the background
*
* @param Closure $closure
* @return void
*/
public function doClosure(Closure $closure): void
{
$closure_task = new Task('tamer_closure', $closure);
$closure_task->setClosure(true);
$this->do($closure_task);
}
/**
* Processes a task in the background
*
* @param Task $task
* @return void
*/
public function do(Task $task): void
{
$this->preformAutoreconf();
$this->tasks[] = $task;
$job = new Job($task);
switch($task->getPriority())
{
case TaskPriority::High:
$this->client->doHighBackground($task->getFunctionName(), msgpack_pack($job->toArray()));
break;
case TaskPriority::Low:
$this->client->doLowBackground($task->getFunctionName(), msgpack_pack($job->toArray()));
break;
default:
case TaskPriority::Normal:
$this->client->doBackground($task->getFunctionName(), msgpack_pack($job->toArray()));
break;
}
}
/**
* Adds a task to the list of tasks to run
*
* @param Task $task
* @return void
*/
public function queue(Task $task): void
{
$this->preformAutoreconf();
$this->tasks[] = $task;
$job = new Job($task);
switch($task->getPriority())
{
case TaskPriority::High:
$this->client->addTaskHigh($task->getFunctionName(), msgpack_pack($job->toArray()));
break;
case TaskPriority::Low:
$this->client->addTaskLow($task->getFunctionName(), msgpack_pack($job->toArray()));
break;
default:
case TaskPriority::Normal:
$this->client->addTask($task->getFunctionName(), msgpack_pack($job->toArray()));
break;
}
}
/**
* Adds a closure task to the list of tasks to run
*
* @param Closure $closure
* @param Closure|null $callback
* @return void
*/
public function queueClosure(Closure $closure, ?Closure $callback=null): void
{
$closure_task = new Task('tamer_closure', $closure, $callback);
$closure_task->setClosure(true);
$this->queue($closure_task);
}
/**
* @return bool
*/
public function run(): bool
{
if(!$this->isConnected())
return false;
$this->preformAutoreconf();
if(!$this->client->runTasks())
return false;
return true;
}
/**
* Processes a task callback in the foreground
*
* @param GearmanTask $task
* @return void
*/
public function callbackHandler(GearmanTask $task): void
{
$job_result = JobResults::fromArray(msgpack_unpack($task->data()));
$internal_task = $this->getTaskById($job_result->getId());
Log::debug('net.nosial.tamerlib', 'callback for task ' . $internal_task->getId() . ' with status ' . $job_result->getStatus() . ' and data size ' . strlen($task->data()) . ' bytes');
try
{
if($internal_task->isClosure())
{
// If the task is a closure, we need to run the callback with the closure's return value
// instead of the job result object
$internal_task->runCallback($job_result->getData());
return;
}
$internal_task->runCallback($job_result);
}
catch(Exception $e)
{
Log::error('net.nosial.tamerlib', 'Failed to run callback for task ' . $internal_task->getId() . ': ' . $e->getMessage(), $e);
}
finally
{
$this->removeTask($internal_task);
}
}
/**
* @param string $id
* @return Task|null
*/
private function getTaskById(string $id): ?Task
{
foreach($this->tasks as $task)
{
if($task->getId() === $id)
{
return $task;
}
}
return null;
}
/**
* Removes a task from the list of tasks
*
* @param Task $task
* @return void
*/
private function removeTask(Task $task): void
{
$this->tasks = array_filter($this->tasks, function($item) use ($task)
{
return $item->getId() !== $task->getId();
});
}
/**
* @return bool
*/
public function automaticReconnectionEnabled(): bool
{
return $this->automatic_reconnect;
}
/**
* @param bool $enable
*/
public function enableAutomaticReconnection(bool $enable): void
{
$this->automatic_reconnect = $enable;
}
/**
* Executes all remaining tasks and closes the connection
*/
public function __destruct()
{
try
{
$this->disconnect();
}
catch(Exception $e)
{
unset($e);
}
}
}

View file

@ -0,0 +1,371 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib\Protocols\Gearman;
use Exception;
use GearmanJob;
use GearmanWorker;
use LogLib\Log;
use Opis\Closure\SerializableClosure;
use TamerLib\Abstracts\JobStatus;
use TamerLib\Exceptions\ConnectionException;
use TamerLib\Interfaces\WorkerProtocolInterface;
use TamerLib\Objects\Job;
use TamerLib\Objects\JobResults;
class Worker implements WorkerProtocolInterface
{
/**
* The Gearman Worker Instance (if connected)
*
* @var GearmanWorker|null
*/
private $worker;
/**
* The list of servers that have been added
*
* @var array
*/
private $defined_servers;
/**
* Indicates if the worker should automatically reconnect to the server
*
* @var bool
*/
private $automatic_reconnect;
/**
* The Unix Timestamp of when the next reconnect should occur (if automatic_reconnect is true)
*
* @var int
*/
private $next_reconnect;
/**
* The options to use when connecting to the server
*
* @var array
*/
private $options;
/**
* Public Constructor with optional username and password
*
* @param string|null $username
* @param string|null $password
*/
public function __construct(?string $username=null, ?string $password=null)
{
$this->worker = null;
$this->defined_servers = [];
$this->automatic_reconnect = false;
$this->next_reconnect = time() + 1800;
$this->options = [];
}
/**
* Adds a server to the list of servers to use
*
* @link http://php.net/manual/en/gearmanworker.addserver.php
* @param string $host (
* @param int $port (default: 4730)
* @return void
*/
public function addServer(string $host, int $port): void
{
if(!isset($this->defined_servers[$host]))
{
$this->defined_servers[$host] = [];
}
if(in_array($port, $this->defined_servers[$host]))
{
return;
}
$this->defined_servers[$host][] = $port;
}
/**
* Adds a list of servers to the list of servers to use
*
* @link http://php.net/manual/en/gearmanworker.addservers.php
* @param string[] $servers (host:port, host:port, ...)
* @return void
*/
public function addServers(array $servers): void
{
foreach($servers as $server)
{
$server = explode(':', $server);
$this->addServer($server[0], (int)$server[1]);
}
}
/**
* Connects to the server
*
* @return void
* @throws ConnectionException
*/
public function connect(): void
{
if($this->isConnected())
return;
$this->worker = new GearmanWorker();
$this->worker->addOptions(GEARMAN_WORKER_GRAB_UNIQ);
foreach($this->defined_servers as $host => $ports)
{
foreach($ports as $port)
{
try
{
$this->worker->addServer($host, $port);
Log::debug('net.nosial.tamerlib', 'connected to gearman server: ' . $host . ':' . $port);
}
catch(Exception $e)
{
throw new ConnectionException('Failed to connect to Gearman server: ' . $host . ':' . $port, 0, $e);
}
}
}
$this->worker->addFunction('tamer_closure', function(GearmanJob $job)
{
$received_job = Job::fromArray(msgpack_unpack($job->workload()));
Log::debug('net.nosial.tamerlib', 'received closure: ' . $received_job->getId());
try
{
/** @var SerializableClosure $closure */
$closure = $received_job->getData();
$result = $closure($received_job);
}
catch(Exception $e)
{
$job->sendFail();
unset($e);
return;
}
$job_results = new JobResults($received_job, JobStatus::Success, $result);
$job->sendComplete(msgpack_pack($job_results->toArray()));
Log::debug('net.nosial.tamerlib', 'completed closure: ' . $received_job->getId());
});
}
/**
* Disconnects from the server
*
* @return void
*/
public function disconnect(): void
{
if(!$this->isConnected())
return;
$this->worker->unregisterAll();
unset($this->worker);
$this->worker = null;
}
/**
* Reconnects to the server if the connection has been lost
*
* @return void
* @throws ConnectionException
*/
public function reconnect(): void
{
$this->disconnect();
$this->connect();
}
/**
* Returns true if the worker is connected to the server
*
* @return bool
*/
public function isConnected(): bool
{
return $this->worker !== null;
}
/**
* The automatic reconnect process
*
* @return void
*/
private function preformAutoreconf(): void
{
if($this->automatic_reconnect && $this->next_reconnect < time())
{
try
{
$this->reconnect();
}
catch (Exception $e)
{
Log::error('net.nosial.tamerlib', 'Failed to reconnect to Gearman server: ' . $e->getMessage());
}
finally
{
$this->next_reconnect = time() + 1800;
}
}
}
/**
* Sets the options to use when connecting to the server
*
* @param array $options
* @return bool
* @inheritDoc
*/
public function setOptions(array $options): void
{
$this->options = $options;
}
/**
* Returns the options to use when connecting to the server
*
* @return array
*/
public function getOptions(): array
{
return $this->options;
}
/**
* Clears the options to use when connecting to the server
*
* @return void
*/
public function clearOptions(): void
{
$this->options = [];
}
/**
* @return bool
*/
public function automaticReconnectionEnabled(): bool
{
return $this->automatic_reconnect;
}
/**
* @param bool $enable
* @return void
*/
public function enableAutomaticReconnection(bool $enable): void
{
$this->automatic_reconnect = $enable;
}
/**
* Adds a function to the list of functions to call
*
* @link http://php.net/manual/en/gearmanworker.addfunction.php
* @param string $name The name of the function to register with the job server
* @param callable $callable The callback function to call when the job is received
* @return void
*/
public function addFunction(string $name, callable $callable): void
{
$this->worker->addFunction($name, function(GearmanJob $job) use ($callable)
{
$received_job = Job::fromArray(msgpack_unpack($job->workload()));
Log::debug('net.nosial.tamerlib', 'received job: ' . $received_job->getId());
try
{
$result = $callable($received_job);
}
catch(Exception $e)
{
$job->sendFail();
unset($e);
return;
}
$job_results = new JobResults($received_job, JobStatus::Success, $result);
$job->sendComplete(msgpack_pack($job_results->toArray()));
Log::debug('net.nosial.tamerlib', 'completed job: ' . $received_job->getId());
});
}
/**
* Removes a function from the list of functions to call
*
* @param string $function_name The name of the function to unregister
* @return void
*/
public function removeFunction(string $function_name): void
{
$this->worker->unregister($function_name);
}
/**
* Waits for a job and calls the appropriate callback function
*
* @link http://php.net/manual/en/gearmanworker.work.php
* @param bool $blocking (default: true) Whether to block until a job is received
* @param int $timeout (default: 500) The timeout in milliseconds (if $blocking is false)
* @param bool $throw_errors (default: false) Whether to throw exceptions on errors
* @return void Returns nothing
* @throws ConnectionException
*/
public function work(bool $blocking=true, int $timeout=500, bool $throw_errors=false): void
{
$this->worker->setTimeout($timeout);
while(true)
{
@$this->preformAutoreconf();
@$this->worker->work();
if($this->worker->returnCode() == GEARMAN_COULD_NOT_CONNECT)
{
throw new ConnectionException('Could not connect to Gearman server');
}
if($this->worker->returnCode() == GEARMAN_TIMEOUT && !$blocking)
{
break;
}
if($this->worker->returnCode() != GEARMAN_SUCCESS && $throw_errors)
{
Log::error('net.nosial.tamerlib', 'Gearman worker error: ' . $this->worker->error());
}
if($blocking)
{
usleep($timeout);
}
}
}
/**
* Executes all remaining tasks and closes the connection
*/
public function __destruct()
{
try
{
$this->disconnect();
}
catch(Exception $e)
{
unset($e);
}
}
}

View file

@ -0,0 +1,401 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib\Protocols\RabbitMq;
use Closure;
use Exception;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use TamerLib\Abstracts\TaskPriority;
use TamerLib\Exceptions\ServerException;
use TamerLib\Interfaces\ClientProtocolInterface;
use TamerLib\Objects\Job;
use TamerLib\Objects\JobResults;
use TamerLib\Objects\Task;
class Client implements ClientProtocolInterface
{
/**
* @var array
*/
private $server_cache;
/**
* Used for tracking the current execution of tasks and run callbacks on completion
*
* @var Task[]
*/
private $tasks;
/**
* @var bool
*/
private $automatic_reconnect;
/**
* @var int
*/
private $next_reconnect;
/**
* @var string|null
*/
private $username;
/**
* @var string|null
*/
private $password;
/**
* @var array
*/
private $options;
/**
* @var AMQPStreamConnection|null
*/
private $connection;
/**
* @var AMQPChannel|null
*/
private $channel;
/***
* @param string|null $username
* @param string|null $password
*/
public function __construct(?string $username=null, ?string $password=null)
{
$this->tasks = [];
$this->automatic_reconnect = false;
$this->next_reconnect = time() + 1800;
$this->server_cache = [];
$this->options = [];
$this->connection = null;
$this->username = $username;
$this->password = $password;
try
{
$this->reconnect();
}
catch(ServerException $e)
{
unset($e);
}
}
public function setOptions(array $options): bool
{
$this->options = $options;
return true;
}
public function addServer(string $host, int $port): bool
{
if(!isset($this->server_cache[$host]))
{
$this->server_cache[$host] = [];
}
if(in_array($port, $this->server_cache[$host]))
{
return true;
}
$this->server_cache[$host][] = $port;
$this->reconnect();
return true;
}
/**
* Adds a list of servers to the list of servers to use
*
* @param array $servers (host:port, host:port, ...)
* @return bool
*/
public function addServers(array $servers): bool
{
foreach($servers as $server)
{
$server = explode(':', $server);
$this->addServer($server[0], $server[1]);
}
return true;
}
/**
* Calculates the priority for a task based on the priority level
*
* @param int $priority
* @return int
*/
private static function calculatePriority(int $priority): int
{
if($priority < TaskPriority::Low)
return 0;
if($priority > TaskPriority::High)
return 255;
return (int) round(($priority / TaskPriority::High) * 255);
}
/**
* @param Task $task
* @return void
*/
public function do(Task $task): void
{
$job = new Job($task);
$message = new AMQPMessage(msgpack_pack($job->toArray()), [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'priority' => self::calculatePriority($task->getPriority()),
]);
$this->channel->basic_publish($message, '', 'tamer_queue');
}
/**
* Executes a closure in the background
*
* @param Closure $closure
* @return void
*/
public function doClosure(Closure $closure): void
{
$closure_task = new Task('tamer_closure', $closure);
$closure_task->setClosure(true);
$this->do($closure_task);
}
/**
* Queues a task to be executed
*
* @param Task $task
* @return void
*/
public function queue(Task $task): void
{
$this->tasks[] = $task;
}
/**
* Adds a closure task to the list of tasks to run
*
* @param Closure $closure
* @param $callback
* @return void
*/
public function queueClosure(Closure $closure, $callback): void
{
$closure_task = new Task('tamer_closure', $closure, $callback);
$closure_task->setClosure(true);
$this->queue($closure_task);
}
/**
* Executes all the tasks that has been added
*
* @return bool
*/
public function run(): bool
{
if(count($this->tasks) === 0)
return false;
$correlationIds = [];
/** @var Task $task */
foreach($this->tasks as $task)
{
$correlationIds[] = $task->getId();
$job = new Job($task);
$message = new AMQPMessage(msgpack_pack($job->toArray()), [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'correlation_id' => $task->getId(),
'reply_to' => 'tamer_queue',
'priority' => self::calculatePriority($task->getPriority()),
]);
$this->channel->basic_publish($message, '', 'tamer_queue');
}
// Register callback for each task
$callback = function($msg) use (&$correlationIds)
{
$job_result = JobResults::fromArray(msgpack_unpack($msg->body));
$task = $this->getTaskById($job_result->getId());
try
{
$task->runCallback($job_result);
}
catch(Exception $e)
{
echo $e->getMessage();
}
// Remove the processed correlation_id
$index = array_search($msg->get('correlation_id'), $correlationIds);
if ($index !== false) {
unset($correlationIds[$index]);
}
$this->channel->basic_ack($msg->delivery_info['delivery_tag']);
// Stop consuming when all tasks are processed
if(count($correlationIds) === 0)
{
$this->channel->basic_cancel($msg->delivery_info['consumer_tag']);
}
};
$this->channel->basic_consume('tamer_queue', '', false, false, false, false, $callback);
// Start consuming messages
while(count($this->channel->callbacks))
{
$this->channel->wait();
}
return true;
}
/**
* @param string $id
* @return Task|null
*/
private function getTaskById(string $id): ?Task
{
foreach($this->tasks as $task)
{
if($task->getId() === $id)
{
return $task;
}
}
return null;
}
/**
* Returns True if the client is automatically reconnecting to the server
*
* @return bool
*/
public function automaticReconnectionEnabled(): bool
{
return $this->automatic_reconnect;
}
/**
* Enables or disables automatic reconnecting to the server
*
* @param bool $enable
* @return void
*/
public function enableAutomaticReconnection(bool $enable): void
{
$this->automatic_reconnect = $enable;
}
private function reconnect()
{
$connections = [];
if(count($this->server_cache) === 0)
return;
foreach($this->server_cache as $host => $ports)
{
foreach($ports as $port)
{
$host = [
'host' => $host,
'port' => $port
];
if($this->username !== null)
$host['username'] = $this->username;
if($this->password !== null)
$host['password'] = $this->password;
$connections[] = $host;
}
}
// Can only connect to one server for now, so we'll just use the first one
$selected_connection = $connections[0];
$this->disconnect();
$this->connection = new AMQPStreamConnection(
$selected_connection['host'],
$selected_connection['port'],
$selected_connection['username'] ?? null,
$selected_connection['password'] ?? null
);
$this->channel = $this->connection->channel();
$this->channel->queue_declare('tamer_queue', false, true, false, false);
}
/**
* Disconnects from the server
*
* @return void
*/
private function disconnect()
{
try
{
if(!is_null($this->channel))
{
$this->channel->close();
}
}
catch(Exception $e)
{
unset($e);
}
finally
{
$this->channel = null;
}
try
{
if(!is_null($this->connection))
{
$this->connection->close();
}
}
catch(Exception $e)
{
unset($e);
}
finally
{
$this->connection = null;
}
}
/**
* Disconnects from the server when the object is destroyed
*/
public function __destruct()
{
$this->disconnect();
}
}

View file

@ -0,0 +1,324 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib\Protocols\RabbitMq;
use Exception;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use TamerLib\Abstracts\JobStatus;
use TamerLib\Interfaces\WorkerProtocolInterface;
use TamerLib\Objects\Job;
use TamerLib\Objects\JobResults;
class Worker implements WorkerProtocolInterface
{
/**
* @var array
*/
private $server_cache;
/**
* @var false
*/
private $automatic_reconnect;
/**
* @var int
*/
private $next_reconnect;
/**
* @var array
*/
private $functions;
/**
* @var string|null
*/
private $username;
/**
* @var string|null
*/
private $password;
/**
* @var AMQPStreamConnection|null
*/
private $connection;
/**
* @var AMQPChannel|null
*/
private $channel;
/**
* @var array
*/
private $options;
/**
* @inheritDoc
*/
public function __construct(?string $username = null, ?string $password = null)
{
$this->server_cache = [];
$this->functions = [];
$this->automatic_reconnect = false;
$this->next_reconnect = time() + 1800;
$this->username = $username;
$this->password = $password;
try
{
$this->reconnect();
}
catch(Exception $e)
{
unset($e);
}
}
/**
* @inheritDoc
*/
public function addServer(string $host, int $port): bool
{
if(!isset($this->server_cache[$host]))
{
$this->server_cache[$host] = [];
}
if(in_array($port, $this->server_cache[$host]))
{
return true;
}
$this->server_cache[$host][] = $port;
$this->reconnect();
return true;
}
/**
* @inheritDoc
*/
public function addServers(array $servers): void
{
foreach($servers as $server)
{
$server = explode(':', $server);
$this->addServer($server[0], $server[1]);
}
}
/**
* @inheritDoc
*/
public function setOptions(array $options): bool
{
$this->options = $options;
return true;
}
/**
* @inheritDoc
*/
public function automaticReconnectionEnabled(): bool
{
return $this->automatic_reconnect;
}
/**
* @inheritDoc
*/
public function enableAutomaticReconnection(bool $enable): void
{
$this->automatic_reconnect = $enable;
}
/**
* @inheritDoc
*/
public function addFunction(string $name, callable $callable, mixed $context = null): void
{
$this->functions[$name] = [
'function' => $callable,
'context' => $context
];
}
/**
* @inheritDoc
*/
public function removeFunction(string $function_name): void
{
unset($this->functions[$function_name]);
}
/**
* @inheritDoc
*/
public function work(bool $blocking = true, int $timeout = 500, bool $throw_errors = false): void
{
$callback = function($message) use ($throw_errors)
{
var_dump($message->body);
$job = Job::fromArray(msgpack_unpack($message->body));
$job_results = new JobResults($job, JobStatus::Success, 'Hello from worker!');
try
{
// Return $job_results
$this->channel->basic_publish(
new AMQPMessage(
msgpack_pack($job_results->toArray()),
[
'correlation_id' => $job->getId()
]
)
);
$this->channel->basic_ack($message->delivery_info['delivery_tag']);
}
catch (Exception $e)
{
if ($throw_errors)
{
throw $e;
}
$job_results = new JobResults($job, JobStatus::Exception, $e->getMessage());
// Return $job_results
$this->channel->basic_publish(
new AMQPMessage(
msgpack_pack($job_results->toArray()),
[
'correlation_id' => $job->getId()
]
)
);
$this->channel->basic_ack($message->delivery_info['delivery_tag']);
}
};
$this->channel->basic_consume('tamer_queue', '', false, false, false, false, $callback);
if ($blocking)
{
while(true)
{
$this->channel->wait();
}
}
else
{
$start = microtime(true);
while (true)
{
if (microtime(true) - $start >= $timeout / 1000)
{
break;
}
$this->channel->wait();
}
}
}
private function reconnect()
{
$connections = [];
if(count($this->server_cache) === 0)
return;
foreach($this->server_cache as $host => $ports)
{
foreach($ports as $port)
{
$host = [
'host' => $host,
'port' => $port
];
if($this->username !== null)
$host['username'] = $this->username;
if($this->password !== null)
$host['password'] = $this->password;
$connections[] = $host;
}
}
// Can only connect to one server for now, so we'll just use the first one
$selected_connection = $connections[0];
$this->disconnect();
$this->connection = new AMQPStreamConnection(
$selected_connection['host'],
$selected_connection['port'],
$selected_connection['username'] ?? null,
$selected_connection['password'] ?? null
);
$this->channel = $this->connection->channel();
$this->channel->queue_declare('tamer_queue', false, true, false, false);
}
/**
* Disconnects from the server
*
* @return void
*/
private function disconnect()
{
try
{
if(!is_null($this->channel))
{
$this->channel->close();
}
}
catch(Exception $e)
{
unset($e);
}
finally
{
$this->channel = null;
}
try
{
if(!is_null($this->connection))
{
$this->connection->close();
}
}
catch(Exception $e)
{
unset($e);
}
finally
{
$this->connection = null;
}
}
/**
* Disconnects from the server when the object is destroyed
*/
public function __destruct()
{
$this->disconnect();
}
}

427
src/TamerLib/Tamer.php Normal file
View file

@ -0,0 +1,427 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib;
use Closure;
use Exception;
use InvalidArgumentException;
use TamerLib\Abstracts\Mode;
use TamerLib\Classes\Functions;
use TamerLib\Classes\Supervisor;
use TamerLib\Classes\Validate;
use TamerLib\Exceptions\ConnectionException;
use TamerLib\Exceptions\UnsupervisedWorkerException;
use TamerLib\Interfaces\ClientProtocolInterface;
use TamerLib\Interfaces\WorkerProtocolInterface;
use TamerLib\Objects\Task;
class Tamer
{
/**
* The protocol to use when connecting to the server
*
* @var string
*/
private static $protocol;
/**
* The protocol to use when connecting to the server as a client
*
* @var ClientProtocolInterface|null
*/
private static $client;
/**
* The protocol to use when connecting to the server as a worker
*
* @var WorkerProtocolInterface|null
*/
private static $worker;
/**
* Indicates if Tamer is running as a client or worker
*
* @var string
* @see Mode
*/
private static $mode;
/**
* Indicates if Tamer is connected to the server
*
* @var bool
*/
private static $connected;
/**
* The supervisor that is supervising the workers
*
* @var Supervisor
*/
private static $supervisor;
/**
* Initializes Tamer as a client and connects to the server
*
* @param string $protocol
* @param array $servers
* @param string|null $username
* @param string|null $password
* @return void
* @throws ConnectionException
*/
public static function init(string $protocol, array $servers, ?string $username=null, ?string $password=null): void
{
if(self::$connected)
{
throw new ConnectionException('Tamer is already connected to the server');
}
if (!Validate::protocolType($protocol))
{
throw new InvalidArgumentException(sprintf('Invalid protocol type: %s', $protocol));
}
self::$protocol = $protocol;
self::$mode = Mode::Client;
self::$client = Functions::createClient($protocol, $username, $password);
self::$client->addServers($servers);
self::$client->connect();
self::$supervisor = new Supervisor($protocol, $servers, $username, $password);
self::$connected = true;
}
/**
* Initializes Tamer as a worker client and connects to the server
*
* @return void
* @throws ConnectionException
* @throws UnsupervisedWorkerException
*/
public static function initWorker(): void
{
if(self::$connected)
{
throw new ConnectionException('Tamer is already connected to the server');
}
if(!Functions::getWorkerVariables()['TAMER_ENABLED'])
{
throw new UnsupervisedWorkerException('Tamer is not enabled for this worker');
}
self::$protocol = Functions::getWorkerVariables()['TAMER_PROTOCOL'];
self::$mode = Mode::Worker;
self::$worker = Functions::createWorker(self::$protocol);
self::$worker->addServers(Functions::getWorkerVariables()['TAMER_SERVERS']);
self::$worker->connect();
self::$connected = true;
}
/**
* Disconnects from the server
*
* @return void
* @throws ConnectionException
*/
public static function disconnect(): void
{
if (!self::$connected)
{
throw new ConnectionException('Tamer is not connected to the server');
}
if (self::$mode === Mode::Client)
{
self::$client->disconnect();
}
else
{
self::$worker->disconnect();
}
self::$connected = false;
}
/**
* Reconnects to the server
*
* @return void
* @throws ConnectionException
*/
public static function reconnect(): void
{
if (self::$mode === Mode::Client)
{
self::$client->reconnect();
}
else
{
self::$worker->reconnect();
}
}
/**
* Adds a task to the queue to be executed by the worker
*
* @param Task $task
* @return void
*/
public static function do(Task $task): void
{
if (self::$mode === Mode::Client)
{
self::$client->do($task);
}
else
{
throw new InvalidArgumentException('Tamer is not running in client mode');
}
}
/**
* Executes a closure operation in the background (does not return a result)
*
* @param Closure $closure The closure operation to perform (remote)
* @return void
*/
public static function doClosure(Closure $closure): void
{
if (self::$mode === Mode::Client)
{
self::$client->doClosure($closure);
}
else
{
throw new InvalidArgumentException('Tamer is not running in client mode');
}
}
/**
* Queues a task to be processed in parallel (returns a result handled by a callback)
*
* @param Task $task
* @return void
*/
public static function queue(Task $task): void
{
if (self::$mode === Mode::Client)
{
self::$client->queue($task);
}
else
{
throw new InvalidArgumentException('Tamer is not running in client mode');
}
}
/**
* Queues a closure to be processed in parallel (returns a result handled by a callback)
*
* @param Closure $closure The closure operation to perform (remote)
* @param Closure|null $callback The closure to call when the operation is complete (local)
* @return void
*/
public static function queueClosure(Closure $closure, ?Closure $callback=null): void
{
if (self::$mode === Mode::Client)
{
self::$client->queueClosure($closure, $callback);
}
else
{
throw new InvalidArgumentException('Tamer is not running in client mode');
}
}
/**
* Executes all tasks in the queue and waits for them to complete
*
* @return bool
*/
public static function run(): bool
{
if (self::$mode === Mode::Client)
{
return self::$client->run();
}
else
{
throw new InvalidArgumentException('Tamer is not running in client mode');
}
}
/**
* Registers a function to the worker
*
* @param string $name The name of the function to add
* @param callable $callable The function to add
* @return void
*/
public static function addFunction(string $name, callable $callable): void
{
if (self::$mode === Mode::Worker)
{
self::$worker->addFunction($name, $callable);
}
else
{
throw new InvalidArgumentException('Tamer is not running in worker mode');
}
}
/**
* Removes a function from the worker
*
* @param string $function_name The name of the function to remove
* @return void
*/
public static function removeFunction(string $function_name): void
{
if (self::$mode === Mode::Worker)
{
self::$worker->removeFunction($function_name);
}
else
{
throw new InvalidArgumentException('Tamer is not running in worker mode');
}
}
/**
* Works a job from the queue (blocking or non-blocking)
*
* @param bool $blocking (optional) Whether to block until a job is available
* @param int $timeout (optional) The timeout to use when blocking
* @param bool $throw_errors (optional) Whether to throw errors or not
* @return void
*/
public static function work(bool $blocking=true, int $timeout=500, bool $throw_errors=false): void
{
if (self::$mode === Mode::Worker)
{
self::$worker->work($blocking, $timeout, $throw_errors);
}
else
{
throw new InvalidArgumentException('Tamer is not running in worker mode');
}
}
/**
* Adds a worker to the supervisor
*
* @param string $target
* @param int $instances
* @return void
* @throws Exception
*/
public static function addWorker(string $target, int $instances): void
{
if (self::$mode === Mode::Client)
{
self::$supervisor->addWorker($target, $instances);
}
else
{
throw new InvalidArgumentException('Tamer is not running in client mode');
}
}
/**
* Starts all workers
*
* @return void
* @throws Exception
*/
public static function startWorkers(): void
{
if (self::$mode === Mode::Client)
{
self::$supervisor->start();
}
else
{
throw new InvalidArgumentException('Tamer is not running in client mode');
}
}
/**
* Stops all workers
*
* @return void
* @throws Exception
*/
public static function stopWorkers(): void
{
if (self::$mode === Mode::Client)
{
self::$supervisor->stop();
}
else
{
throw new InvalidArgumentException('Tamer is not running in client mode');
}
}
/**
* Restarts all workers
*
* @return void
* @throws Exception
*/
public static function restartWorkers(): void
{
if (self::$mode === Mode::Client)
{
self::$supervisor->restart();
}
else
{
throw new InvalidArgumentException('Tamer is not running in client mode');
}
}
/**
* @return string
*/
public static function getProtocol(): string
{
return self::$protocol;
}
/**
* @return ClientProtocolInterface|null
*/
public static function getClient(): ?ClientProtocolInterface
{
return self::$client;
}
/**
* @return WorkerProtocolInterface|null
*/
public static function getWorker(): ?WorkerProtocolInterface
{
return self::$worker;
}
/**
* @return string
*/
public static function getMode(): string
{
return self::$mode;
}
/**
* @return bool
*/
public static function isConnected(): bool
{
return self::$connected;
}
}