Implemented Redis Server controller
This commit is contained in:
parent
40bb5e29d0
commit
0c23fdfac2
34 changed files with 226 additions and 3955 deletions
5
.idea/php.xml
generated
5
.idea/php.xml
generated
|
@ -15,11 +15,8 @@
|
|||
<path value="/etc/ncc" />
|
||||
<path value="/var/ncc/packages/net.nosial.optslib=1.0.0" />
|
||||
<path value="/var/ncc/packages/net.nosial.loglib=1.0.1" />
|
||||
<path value="/var/ncc/packages/com.phpseclib.phpseclib=3.0.18" />
|
||||
<path value="/var/ncc/packages/com.php_amqplib.php_amqplib=3.5.1" />
|
||||
<path value="/var/ncc/packages/com.paragonie.random_compat=9.99.100" />
|
||||
<path value="/var/ncc/packages/com.paragonie.constant_time_encoding=2.6.3" />
|
||||
<path value="/var/ncc/packages/com.opis.closure=3.6.3" />
|
||||
<path value="/var/ncc/packages/com.symfony.process=6.2.10" />
|
||||
</include_path>
|
||||
</component>
|
||||
<component name="PhpProjectSharedConfiguration" php_language_level="8.1" />
|
||||
|
|
|
@ -47,12 +47,6 @@
|
|||
"source_type": "remote",
|
||||
"source": "opis/closure=latest@composer"
|
||||
},
|
||||
{
|
||||
"name": "com.php_amqplib.php_amqplib",
|
||||
"version": "latest",
|
||||
"source_type": "remote",
|
||||
"source": "php-amqplib/php-amqplib=latest@composer"
|
||||
},
|
||||
{
|
||||
"name": "com.symfony.process",
|
||||
"version": "latest",
|
||||
|
|
|
@ -1,16 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace TamerLib\Abstracts\ExitCodes;
|
||||
|
||||
abstract class WorkerExitCodes
|
||||
{
|
||||
const GracefulShutdown = 0;
|
||||
|
||||
const Exception = 1;
|
||||
|
||||
const UnsupervisedWorker = 2;
|
||||
|
||||
const ProtocolUnavailable = 3;
|
||||
|
||||
const ServerConnectionFailed = 4;
|
||||
}
|
|
@ -1,12 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace TamerLib\Abstracts;
|
||||
|
||||
abstract class JobStatus
|
||||
{
|
||||
const Success = 0;
|
||||
|
||||
const Failure = 1;
|
||||
|
||||
const Exception = 2;
|
||||
}
|
|
@ -1,10 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace TamerLib\Abstracts;
|
||||
|
||||
abstract class Mode
|
||||
{
|
||||
const Client = 'client';
|
||||
|
||||
const Worker = 'worker';
|
||||
}
|
|
@ -1,12 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace TamerLib\Abstracts;
|
||||
|
||||
abstract class ObjectType
|
||||
{
|
||||
const Job = 'tamer_job';
|
||||
|
||||
const JobResults = 'tamer_job_results';
|
||||
|
||||
const Unknown = 'unknown';
|
||||
}
|
|
@ -1,10 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace TamerLib\Abstracts;
|
||||
|
||||
abstract class ProtocolType
|
||||
{
|
||||
const Gearman = 'gearman';
|
||||
|
||||
const RabbitMQ = 'rabbitmq';
|
||||
}
|
|
@ -1,12 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace TamerLib\Abstracts;
|
||||
|
||||
abstract class TaskPriority
|
||||
{
|
||||
const Low = 0;
|
||||
|
||||
const Normal = 1;
|
||||
|
||||
const High = 2;
|
||||
}
|
|
@ -1,149 +0,0 @@
|
|||
<?php
|
||||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace TamerLib\Classes;
|
||||
|
||||
use Exception;
|
||||
use InvalidArgumentException;
|
||||
use OptsLib\Parse;
|
||||
use Symfony\Component\Process\PhpExecutableFinder;
|
||||
use TamerLib\Abstracts\ProtocolType;
|
||||
use TamerLib\Abstracts\TaskPriority;
|
||||
use TamerLib\Interfaces\ClientProtocolInterface;
|
||||
use TamerLib\Interfaces\WorkerProtocolInterface;
|
||||
class Functions
|
||||
{
|
||||
/**
|
||||
* A cache of the worker variables
|
||||
*
|
||||
* @var array|null
|
||||
*/
|
||||
private static $worker_variables;
|
||||
|
||||
/**
|
||||
* A cache of the php binary path
|
||||
*
|
||||
* @var string|null
|
||||
*/
|
||||
private static $php_bin;
|
||||
|
||||
/**
|
||||
* Attempts to get the worker id from the command line arguments or the environment variable TAMER_WORKER_ID
|
||||
* If neither are set, returns null.
|
||||
*
|
||||
* @return string|null
|
||||
*/
|
||||
public static function getWorkerId(): ?string
|
||||
{
|
||||
$options = Parse::getArguments();
|
||||
|
||||
$worker_id = ($options['worker-id'] ?? null);
|
||||
if($worker_id !== null)
|
||||
return $worker_id;
|
||||
|
||||
$worker_id = getenv('TAMER_WORKER_ID');
|
||||
if($worker_id !== false)
|
||||
return $worker_id;
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a client protocol object based on the protocol type
|
||||
*
|
||||
* @param string $protocol
|
||||
* @param string|null $username
|
||||
* @param string|null $password
|
||||
* @return ClientProtocolInterface
|
||||
*/
|
||||
public static function createClient(string $protocol, ?string $username=null, ?string $password=null): ClientProtocolInterface
|
||||
{
|
||||
/** @noinspection PhpFullyQualifiedNameUsageInspection */
|
||||
return match (strtolower($protocol))
|
||||
{
|
||||
ProtocolType::Gearman => new \TamerLib\Protocols\Gearman\Client($username, $password),
|
||||
ProtocolType::RabbitMQ => throw new InvalidArgumentException('RabbitMQ is not fully implemented yet'),
|
||||
default => throw new InvalidArgumentException('Invalid protocol type'),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $protocol
|
||||
* @param string|null $username
|
||||
* @param string|null $password
|
||||
* @return WorkerProtocolInterface
|
||||
*/
|
||||
public static function createWorker(string $protocol, ?string $username=null, ?string $password=null): WorkerProtocolInterface
|
||||
{
|
||||
/** @noinspection PhpFullyQualifiedNameUsageInspection */
|
||||
return match (strtolower($protocol))
|
||||
{
|
||||
ProtocolType::Gearman => new \TamerLib\Protocols\Gearman\Worker($username, $password),
|
||||
ProtocolType::RabbitMQ => throw new InvalidArgumentException('RabbitMQ is not fully implemented yet'),
|
||||
default => throw new InvalidArgumentException('Invalid protocol type'),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the worker variables from the environment variables
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public static function getWorkerVariables(): array
|
||||
{
|
||||
if(self::$worker_variables == null)
|
||||
{
|
||||
self::$worker_variables = [
|
||||
'TAMER_ENABLED' => getenv('TAMER_ENABLED') === 'true',
|
||||
'TAMER_PROTOCOL' => getenv('TAMER_PROTOCOL'),
|
||||
'TAMER_SERVERS' => getenv('TAMER_SERVERS'),
|
||||
'TAMER_USERNAME' => getenv('TAMER_USERNAME'),
|
||||
'TAMER_PASSWORD' => getenv('TAMER_PASSWORD'),
|
||||
'TAMER_INSTANCE_ID' => getenv('TAMER_INSTANCE_ID'),
|
||||
];
|
||||
|
||||
if(self::$worker_variables['TAMER_SERVERS'] !== false)
|
||||
self::$worker_variables['TAMER_SERVERS'] = explode(',', self::$worker_variables['TAMER_SERVERS']);
|
||||
}
|
||||
|
||||
return self::$worker_variables;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the path to the php binary
|
||||
*
|
||||
* @return string
|
||||
* @throws Exception
|
||||
*/
|
||||
public static function findPhpBin(): string
|
||||
{
|
||||
if(self::$php_bin !== null)
|
||||
return self::$php_bin;
|
||||
|
||||
$php_finder = new PhpExecutableFinder();
|
||||
$php_bin = $php_finder->find();
|
||||
if($php_bin === false)
|
||||
throw new Exception('Unable to find the php binary');
|
||||
|
||||
self::$php_bin = $php_bin;
|
||||
return $php_bin;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the priority for a task based on the priority level
|
||||
*
|
||||
* @param int $priority
|
||||
* @return int
|
||||
*/
|
||||
public static function calculatePriority(int $priority): int
|
||||
{
|
||||
if($priority < TaskPriority::Low)
|
||||
return 0;
|
||||
|
||||
if($priority > TaskPriority::High)
|
||||
return 255;
|
||||
|
||||
return (int) round(($priority / TaskPriority::High) * 255);
|
||||
}
|
||||
}
|
155
src/TamerLib/Classes/RedisServer.php
Normal file
155
src/TamerLib/Classes/RedisServer.php
Normal file
|
@ -0,0 +1,155 @@
|
|||
<?php
|
||||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace TamerLib\Classes;
|
||||
|
||||
use LogLib\Log;
|
||||
use Redis;
|
||||
use RedisException;
|
||||
use Symfony\Component\Process\Process;
|
||||
use TamerLib\Exceptions\NoAvailablePortException;
|
||||
use TamerLib\Exceptions\RedisServerException;
|
||||
|
||||
class RedisServer
|
||||
{
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
private $cmd;
|
||||
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
private $host;
|
||||
|
||||
/**
|
||||
* @var int|null
|
||||
*/
|
||||
private $port;
|
||||
|
||||
/**
|
||||
* @var Process|null
|
||||
*/
|
||||
private $server_process;
|
||||
|
||||
/**
|
||||
* RedisServer constructor.
|
||||
*
|
||||
* @param string $cmd
|
||||
* @param string $host
|
||||
* @param int|null $port
|
||||
* @throws NoAvailablePortException
|
||||
*/
|
||||
public function __construct(string $cmd='redis-server', string $host='127.0.0.1', ?int $port=null)
|
||||
{
|
||||
if (is_null($port))
|
||||
{
|
||||
$port = Utilities::getAvailablePort();
|
||||
Log::debug('net.nosial.tamerlib', 'Selected port ' . $port . '.');
|
||||
}
|
||||
|
||||
$this->cmd = $cmd;
|
||||
$this->host = $host;
|
||||
$this->port = $port;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the port that the Redis server is listening on.
|
||||
*
|
||||
* @return int|null
|
||||
*/
|
||||
public function getPort(): ?int
|
||||
{
|
||||
return $this->port;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if the Redis server is running.
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isRunning(): bool
|
||||
{
|
||||
if(is_null($this->server_process))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return $this->server_process->isRunning();
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the Redis server.
|
||||
*
|
||||
* @param int $timeout
|
||||
* @return bool
|
||||
* @throws RedisServerException
|
||||
* @throws RedisException
|
||||
*/
|
||||
public function start(int $timeout=60): bool
|
||||
{
|
||||
if($this->isRunning())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
Log::verbose('net.nosial.tamerlib', 'Starting Redis server on port ' . $this->port . '.');
|
||||
$this->server_process = new Process([$this->cmd, '--port', $this->port]);
|
||||
$this->server_process->start();
|
||||
|
||||
// Use a redis client and ping the server until it responds.
|
||||
$redis_client = new Redis();
|
||||
$timeout_counter = 0;
|
||||
|
||||
while(!$redis_client->isConnected())
|
||||
{
|
||||
if($timeout_counter >= $timeout)
|
||||
{
|
||||
throw new RedisServerException('Redis server failed to start within ' . $timeout . ' seconds.');
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
$redis_client->connect($this->host, $this->port);
|
||||
}
|
||||
catch (RedisException $e)
|
||||
{
|
||||
// Do nothing.
|
||||
}
|
||||
finally
|
||||
{
|
||||
sleep(1);
|
||||
$timeout_counter++;
|
||||
}
|
||||
}
|
||||
|
||||
Log::verbose('net.nosial.tamerlib', 'Redis server started.');
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the Redis server.
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function stop(): bool
|
||||
{
|
||||
if(!$this->isRunning())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
$this->server_process->stop();
|
||||
Log::verbose('net.nosial.tamerlib', 'Redis server stopped.');
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Terminates the Redis server.
|
||||
*/
|
||||
public function __destruct()
|
||||
{
|
||||
$this->stop();
|
||||
}
|
||||
}
|
|
@ -1,194 +0,0 @@
|
|||
<?php
|
||||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace TamerLib\Classes;
|
||||
|
||||
use Exception;
|
||||
use LogLib\Log;
|
||||
use Symfony\Component\Process\Process;
|
||||
use TamerLib\Objects\WorkerInstance;
|
||||
|
||||
class Supervisor
|
||||
{
|
||||
/**
|
||||
* A list of all the workers that are initialized
|
||||
*
|
||||
* @var WorkerInstance[]
|
||||
*/
|
||||
private $workers;
|
||||
|
||||
/**
|
||||
* The protocol to pass to the worker instances
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
private $protocol;
|
||||
|
||||
/**
|
||||
* The list of servers to pass to the worker instances (eg; host:port)
|
||||
*
|
||||
* @var string[]
|
||||
*/
|
||||
private $servers;
|
||||
|
||||
/**
|
||||
* (Optional) The username to pass to the worker instances
|
||||
*
|
||||
* @var string|null
|
||||
*/
|
||||
private $username;
|
||||
|
||||
/**
|
||||
* (Optional) The password to pass to the worker instances
|
||||
*
|
||||
* @var string|null
|
||||
*/
|
||||
private $password;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public function __construct(string $protocol, array $servers, ?string $username = null, ?string $password = null)
|
||||
{
|
||||
$this->workers = [];
|
||||
$this->protocol = $protocol;
|
||||
$this->servers = $servers;
|
||||
$this->username = $username;
|
||||
$this->password = $password;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a worker to the supervisor instance
|
||||
*
|
||||
* @param string $target
|
||||
* @param int $instances
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function addWorker(string $target, int $instances): void
|
||||
{
|
||||
for ($i = 0; $i < $instances; $i++)
|
||||
{
|
||||
$this->workers[] = new WorkerInstance($target, $this->protocol, $this->servers, $this->username, $this->password);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts all the workers
|
||||
*
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function start(): void
|
||||
{
|
||||
/** @var WorkerInstance $worker */
|
||||
foreach ($this->workers as $worker)
|
||||
{
|
||||
$worker->start();
|
||||
}
|
||||
|
||||
// Ensure that all the workers are running
|
||||
foreach($this->workers as $worker)
|
||||
{
|
||||
if (!$worker->isRunning())
|
||||
{
|
||||
throw new Exception("Worker {$worker->getId()} is not running");
|
||||
}
|
||||
|
||||
while(true)
|
||||
{
|
||||
switch($worker->getProcess()->getStatus())
|
||||
{
|
||||
case Process::STATUS_STARTED:
|
||||
Log::debug('net.nosial.tamerlib', "worker {$worker->getId()} is running");
|
||||
break 2;
|
||||
|
||||
case Process::STATUS_TERMINATED:
|
||||
throw new Exception("Worker {$worker->getId()} has terminated");
|
||||
|
||||
default:
|
||||
echo "Worker {$worker->getId()} is {$worker->getProcess()->getStatus()}" . PHP_EOL;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops all the workers
|
||||
*
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function stop(): void
|
||||
{
|
||||
/** @var WorkerInstance $worker */
|
||||
foreach ($this->workers as $worker)
|
||||
{
|
||||
$worker->stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Restarts all the workers
|
||||
*
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function restart(): void
|
||||
{
|
||||
/** @var WorkerInstance $worker */
|
||||
foreach ($this->workers as $worker)
|
||||
{
|
||||
$worker->stop();
|
||||
$worker->start();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Monitors all the workers and restarts them if they are not running
|
||||
*
|
||||
* @param bool $blocking
|
||||
* @param bool $auto_restart
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function monitor(bool $blocking=false, bool $auto_restart=true): void
|
||||
{
|
||||
while(true)
|
||||
{
|
||||
/** @var WorkerInstance $worker */
|
||||
foreach ($this->workers as $worker)
|
||||
{
|
||||
if (!$worker->isRunning())
|
||||
{
|
||||
if ($auto_restart)
|
||||
{
|
||||
Log::warning('net.nosial.tamerlib', "worker {$worker->getId()} is not running, restarting");
|
||||
$worker->start();
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new Exception("Worker {$worker->getId()} is not running");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!$blocking)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
sleep(1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
*/
|
||||
public function __destruct()
|
||||
{
|
||||
$this->stop();
|
||||
}
|
||||
|
||||
}
|
38
src/TamerLib/Classes/Utilities.php
Normal file
38
src/TamerLib/Classes/Utilities.php
Normal file
|
@ -0,0 +1,38 @@
|
|||
<?php
|
||||
|
||||
namespace TamerLib\Classes;
|
||||
|
||||
use TamerLib\Exceptions\NoAvailablePortException;
|
||||
|
||||
class Utilities
|
||||
{
|
||||
/**
|
||||
* Attempts to find an available port in the given range. If no port is available,
|
||||
* a NoAvailablePortException is thrown.
|
||||
*
|
||||
* @param string $host
|
||||
* @param int $start
|
||||
* @param int $end
|
||||
* @return int
|
||||
* @throws NoAvailablePortException
|
||||
*/
|
||||
public static function getAvailablePort(string $host='127.0.0.1', int $start=1024, int $end=65535): int
|
||||
{
|
||||
$range = range($start, $end);
|
||||
shuffle($range);
|
||||
foreach ($range as $port)
|
||||
{
|
||||
$connection = @stream_socket_client('tcp://' . $host . ':' . $port);
|
||||
if (is_resource($connection))
|
||||
{
|
||||
fclose($connection);
|
||||
}
|
||||
else
|
||||
{
|
||||
return $port;
|
||||
}
|
||||
}
|
||||
|
||||
throw new NoAvailablePortException('No available port found in range ' . $start . ' to ' . $end . '.');
|
||||
}
|
||||
}
|
|
@ -1,81 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace TamerLib\Classes;
|
||||
|
||||
use TamerLib\Abstracts\Mode;
|
||||
use TamerLib\Abstracts\ObjectType;
|
||||
use TamerLib\Abstracts\ProtocolType;
|
||||
use TamerLib\Abstracts\TaskPriority;
|
||||
|
||||
class Validate
|
||||
{
|
||||
/**
|
||||
* Returns true if the input is a valid protocol type.
|
||||
*
|
||||
* @param string $input
|
||||
* @return bool
|
||||
*/
|
||||
public static function protocolType(string $input): bool
|
||||
{
|
||||
return match (strtolower($input))
|
||||
{
|
||||
ProtocolType::Gearman, ProtocolType::RabbitMQ => true,
|
||||
default => false,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $input
|
||||
* @return bool
|
||||
*/
|
||||
public static function mode(string $input): bool
|
||||
{
|
||||
return match (strtolower($input))
|
||||
{
|
||||
Mode::Client, Mode::Worker => true,
|
||||
default => false,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the input is a valid task priority.
|
||||
*
|
||||
* @param int $input
|
||||
* @return bool
|
||||
*/
|
||||
public static function taskPriority(int $input): bool
|
||||
{
|
||||
return match ($input)
|
||||
{
|
||||
TaskPriority::Low, TaskPriority::Normal, TaskPriority::High => true,
|
||||
default => false,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Determines the object type
|
||||
*
|
||||
* @param $input
|
||||
* @return string
|
||||
*/
|
||||
public static function getObjectType($input): string
|
||||
{
|
||||
if(!is_array($input))
|
||||
{
|
||||
return ObjectType::Unknown;
|
||||
}
|
||||
|
||||
if(!array_key_exists('type', $input))
|
||||
{
|
||||
return ObjectType::Unknown;
|
||||
}
|
||||
|
||||
return match ($input['type'])
|
||||
{
|
||||
ObjectType::Job => ObjectType::Job,
|
||||
ObjectType::JobResults => ObjectType::JobResults,
|
||||
default => ObjectType::Unknown,
|
||||
};
|
||||
}
|
||||
}
|
|
@ -1,19 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace TamerLib\Exceptions;
|
||||
|
||||
use Exception;
|
||||
use Throwable;
|
||||
|
||||
class ConnectionException extends Exception
|
||||
{
|
||||
/**
|
||||
* @param string $message
|
||||
* @param int $code
|
||||
* @param Throwable|null $previous
|
||||
*/
|
||||
public function __construct(string $message="", int $code=0, Throwable $previous=null)
|
||||
{
|
||||
parent::__construct($message, $code, $previous);
|
||||
}
|
||||
}
|
|
@ -5,7 +5,7 @@
|
|||
use Exception;
|
||||
use Throwable;
|
||||
|
||||
class UnsupervisedWorkerException extends Exception
|
||||
class NoAvailablePortException extends Exception
|
||||
{
|
||||
/**
|
||||
* @param string $message
|
13
src/TamerLib/Exceptions/RedisServerException.php
Normal file
13
src/TamerLib/Exceptions/RedisServerException.php
Normal file
|
@ -0,0 +1,13 @@
|
|||
<?php
|
||||
|
||||
namespace TamerLib\Exceptions;
|
||||
|
||||
use Throwable;
|
||||
|
||||
class RedisServerException extends \Exception
|
||||
{
|
||||
public function __construct(string $message = "", int $code = 0, ?Throwable $previous = null)
|
||||
{
|
||||
parent::__construct($message, $code, $previous);
|
||||
}
|
||||
}
|
|
@ -1,142 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace TamerLib\Interfaces;
|
||||
|
||||
use Closure;
|
||||
use TamerLib\Exceptions\ConnectionException;
|
||||
use TamerLib\Objects\Task;
|
||||
|
||||
interface ClientProtocolInterface
|
||||
{
|
||||
/**
|
||||
* Public Constructor with optional username and password
|
||||
*
|
||||
* @param string|null $username (optional) The username to use when connecting to the server (if required)
|
||||
* @param string|null $password (optional) The password to use when connecting to the server
|
||||
*/
|
||||
public function __construct(?string $username=null, ?string $password=null);
|
||||
|
||||
/**
|
||||
* Adds a server to the list of servers to use
|
||||
*
|
||||
* @param string $host The host to connect to (eg; 127.0.0.1)
|
||||
* @param int $port The port to connect to (eg; 4730)
|
||||
* @return void
|
||||
*/
|
||||
public function addServer(string $host, int $port): void;
|
||||
|
||||
/**
|
||||
* Adds a list of servers to the list of servers to use
|
||||
*
|
||||
* @param array $servers An array of servers to connect to (eg; ['host:port', 'host:port', ...])
|
||||
* @return void
|
||||
*/
|
||||
public function addServers(array $servers): void;
|
||||
|
||||
/**
|
||||
* Connects to all the configured servers
|
||||
*
|
||||
* @throws ConnectionException
|
||||
* @return void
|
||||
*/
|
||||
public function connect(): void;
|
||||
|
||||
/**
|
||||
* Disconnects from all the configured servers
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function disconnect(): void;
|
||||
|
||||
/**
|
||||
* Reconnects to all the configured servers
|
||||
*
|
||||
* @throws ConnectionException
|
||||
* @return void
|
||||
*/
|
||||
public function reconnect(): void;
|
||||
|
||||
/**
|
||||
* Returns True if the client is connected to the server (or servers)
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isConnected(): bool;
|
||||
|
||||
/**
|
||||
* Sets options to the client (client specific)
|
||||
*
|
||||
* @param array $options
|
||||
* @return void
|
||||
*/
|
||||
public function setOptions(array $options): void;
|
||||
|
||||
/**
|
||||
* Returns the options set on the client
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function getOptions(): array;
|
||||
|
||||
/**
|
||||
* Clears all options from the client
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function clearOptions(): void;
|
||||
|
||||
/**
|
||||
* Returns True if the client is set to automatically reconnect to the server after a period of time
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function automaticReconnectionEnabled(): bool;
|
||||
|
||||
/**
|
||||
* Enables or disables automatic reconnecting to the server after a period of time
|
||||
*
|
||||
* @param bool $enable
|
||||
* @return void
|
||||
*/
|
||||
public function enableAutomaticReconnection(bool $enable): void;
|
||||
|
||||
/**
|
||||
* Processes a task in the background (does not return a result)
|
||||
*
|
||||
* @param Task $task The task to process
|
||||
* @return void
|
||||
*/
|
||||
public function do(Task $task): void;
|
||||
|
||||
/**
|
||||
* Executes a closure operation in the background (does not return a result)
|
||||
*
|
||||
* @param Closure $closure The closure operation to perform (remote)
|
||||
* @return void
|
||||
*/
|
||||
public function doClosure(Closure $closure): void;
|
||||
|
||||
/**
|
||||
* Queues a task to be processed in parallel (returns a result handled by a callback)
|
||||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
*/
|
||||
public function queue(Task $task): void;
|
||||
|
||||
/**
|
||||
* Queues a closure to be processed in parallel (returns a result handled by a callback)
|
||||
*
|
||||
* @param Closure $closure The closure operation to perform (remote)
|
||||
* @param Closure|null $callback The closure to call when the operation is complete (local)
|
||||
* @return void
|
||||
*/
|
||||
public function queueClosure(Closure $closure, ?Closure $callback=null): void;
|
||||
|
||||
/**
|
||||
* Executes all tasks in the queue and waits for them to complete
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function run(): bool;
|
||||
}
|
|
@ -1,127 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace TamerLib\Interfaces;
|
||||
|
||||
use TamerLib\Exceptions\ConnectionException;
|
||||
|
||||
interface WorkerProtocolInterface
|
||||
{
|
||||
/**
|
||||
* Public Constructor with optional username and password
|
||||
*
|
||||
* @param string|null $username (optional) The username to use when connecting to the server (if required)
|
||||
* @param string|null $password (optional) The password to use when connecting to the server
|
||||
*/
|
||||
public function __construct(?string $username=null, ?string $password=null);
|
||||
|
||||
/**
|
||||
* Adds a server to the list of servers to use
|
||||
*
|
||||
* @param string $host The host to connect to (eg; 127.0.0.1)
|
||||
* @param int $port The port to connect to (eg; 4730)
|
||||
* @return void
|
||||
*/
|
||||
public function addServer(string $host, int $port): void;
|
||||
|
||||
/**
|
||||
* Adds a list of servers to the list of servers to use
|
||||
*
|
||||
* @param array $servers An array of servers to connect to (eg; ['host:port', 'host:port', ...])
|
||||
* @return void
|
||||
*/
|
||||
public function addServers(array $servers): void;
|
||||
|
||||
/**
|
||||
* Connects to all the configured servers
|
||||
*
|
||||
* @throws ConnectionException
|
||||
* @return void
|
||||
*/
|
||||
public function connect(): void;
|
||||
|
||||
/**
|
||||
* Disconnects from all the configured servers
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function disconnect(): void;
|
||||
|
||||
/**
|
||||
* Reconnects to all the configured servers
|
||||
*
|
||||
* @throws ConnectionException
|
||||
* @return void
|
||||
*/
|
||||
public function reconnect(): void;
|
||||
|
||||
/**
|
||||
* Returns True if the client is connected to the server (or servers)
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isConnected(): bool;
|
||||
|
||||
/**
|
||||
* Sets options to the worker (worker specific)
|
||||
*
|
||||
* @param array $options
|
||||
* @return void
|
||||
*/
|
||||
public function setOptions(array $options): void;
|
||||
|
||||
/**
|
||||
* Returns the options set on the worker
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function getOptions(): array;
|
||||
|
||||
/**
|
||||
* Clears all options from the worker
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function clearOptions(): void;
|
||||
|
||||
/**
|
||||
* Returns True if the worker is set to automatically reconnect to the server after a period of time
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function automaticReconnectionEnabled(): bool;
|
||||
|
||||
/**
|
||||
* Enables or disables automatic reconnecting to the server after a period of time
|
||||
*
|
||||
* @param bool $enable
|
||||
* @return void
|
||||
*/
|
||||
public function enableAutomaticReconnection(bool $enable): void;
|
||||
|
||||
/**
|
||||
* Registers a function to the worker
|
||||
*
|
||||
* @param string $name The name of the function to add
|
||||
* @param callable $callable The function to add
|
||||
* @return void
|
||||
*/
|
||||
public function addFunction(string $name, callable $callable): void;
|
||||
|
||||
/**
|
||||
* Removes a function from the worker
|
||||
*
|
||||
* @param string $function_name The name of the function to remove
|
||||
* @return void
|
||||
*/
|
||||
public function removeFunction(string $function_name): void;
|
||||
|
||||
/**
|
||||
* Works a job from the queue (blocking or non-blocking)
|
||||
*
|
||||
* @param bool $blocking (optional) Whether to block until a job is available
|
||||
* @param int $timeout (optional) The timeout to use when blocking
|
||||
* @param bool $throw_errors (optional) Whether to throw errors or not
|
||||
* @return void
|
||||
*/
|
||||
public function work(bool $blocking=true, int $timeout=500, bool $throw_errors=false): void;
|
||||
}
|
|
@ -1,127 +0,0 @@
|
|||
<?php
|
||||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace TamerLib\Objects;
|
||||
|
||||
use Closure;
|
||||
use Opis\Closure\SerializableClosure;
|
||||
use function unserialize;
|
||||
|
||||
class Job
|
||||
{
|
||||
/**
|
||||
* The ID of the job
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
private $id;
|
||||
|
||||
/**
|
||||
* The name of the function
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
private $name;
|
||||
|
||||
/**
|
||||
* The data to be passed to the function
|
||||
*
|
||||
* @var string|Closure|null
|
||||
*/
|
||||
private $data;
|
||||
|
||||
/**
|
||||
* Indicates if the data is a closure
|
||||
*
|
||||
* @var bool
|
||||
*/
|
||||
private $closure;
|
||||
|
||||
public function __construct(Task $task)
|
||||
{
|
||||
$this->id = $task->getId();
|
||||
$this->name = $task->getFunctionName();
|
||||
$this->data = $task->getData();
|
||||
$this->closure = $task->isClosure();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the ID of the Job
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
public function getId(): string
|
||||
{
|
||||
return $this->id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the function name of the Job
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
public function getName(): string
|
||||
{
|
||||
return $this->name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the data of the Job
|
||||
*
|
||||
* @return string|Closure|null
|
||||
*/
|
||||
public function getData(): Closure|string|null
|
||||
{
|
||||
return $this->data;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return bool
|
||||
*/
|
||||
public function isClosure(): bool
|
||||
{
|
||||
return $this->closure;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an array representation of the Job
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function toArray(): array
|
||||
{
|
||||
return [
|
||||
'type' => 'tamer_job',
|
||||
'id' => $this->id,
|
||||
'name' => $this->name,
|
||||
'data' => ($this->closure ? serialize(new SerializableClosure($this->data)) : $this->data),
|
||||
'closure' => $this->closure
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a Job from an array
|
||||
*
|
||||
* @param array $data
|
||||
* @return Job
|
||||
*/
|
||||
public static function fromArray(array $data): Job
|
||||
{
|
||||
$job_data = $data['data'];
|
||||
|
||||
if($data['closure'] === true)
|
||||
{
|
||||
/** @var SerializableClosure $job_data */
|
||||
$job_data = unserialize($data['data']);
|
||||
$job_data = $job_data->getClosure();
|
||||
}
|
||||
|
||||
$job = new Job(new Task($data['name'], $job_data));
|
||||
$job->id = $data['id'];
|
||||
$job->closure = $data['closure'];
|
||||
|
||||
return $job;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,130 +0,0 @@
|
|||
<?php
|
||||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace TamerLib\Objects;
|
||||
|
||||
use TamerLib\Abstracts\JobStatus;
|
||||
|
||||
class JobResults
|
||||
{
|
||||
/**
|
||||
* The ID of the job
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
private $id;
|
||||
|
||||
/**
|
||||
* The data to be passed to the function
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
private $data;
|
||||
|
||||
/**
|
||||
* The status of the job
|
||||
*
|
||||
* @var int
|
||||
* @see JobStatus
|
||||
*/
|
||||
private $status;
|
||||
|
||||
public function __construct(?Job $job=null, ?int $status=null, $results=null)
|
||||
{
|
||||
if($job !== null)
|
||||
{
|
||||
$this->id = $job->getId();
|
||||
$this->data = $results;
|
||||
$this->status = $status;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the ID of the Job
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
public function getId(): string
|
||||
{
|
||||
return $this->id;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the data of the Job
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
public function getData(): string
|
||||
{
|
||||
return $this->data;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int
|
||||
* @noinspection PhpUnused
|
||||
*/
|
||||
public function getStatus(): int
|
||||
{
|
||||
return $this->status;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an array representation of the Job
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function toArray(): array
|
||||
{
|
||||
return [
|
||||
'type' => 'tamer_job_results',
|
||||
'id' => $this->id,
|
||||
'data' => $this->data,
|
||||
'status' => $this->status
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a Job from an array
|
||||
*
|
||||
* @param array $data
|
||||
* @return JobResults
|
||||
*/
|
||||
public static function fromArray(array $data): JobResults
|
||||
{
|
||||
$job = new JobResults();
|
||||
|
||||
$job->setId($data['id']);
|
||||
$job->setData($data['data']);
|
||||
$job->setStatus($data['status']);
|
||||
|
||||
return $job;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $id
|
||||
*/
|
||||
protected function setId(string $id): void
|
||||
{
|
||||
$this->id = $id;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $data
|
||||
*/
|
||||
protected function setData(string $data): void
|
||||
{
|
||||
$this->data = $data;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int|null $status
|
||||
*/
|
||||
protected function setStatus(?int $status): void
|
||||
{
|
||||
$this->status = $status;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -1,192 +0,0 @@
|
|||
<?php
|
||||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace TamerLib\Objects;
|
||||
|
||||
use Closure;
|
||||
use InvalidArgumentException;
|
||||
use TamerLib\Abstracts\TaskPriority;
|
||||
use TamerLib\Classes\Validate;
|
||||
|
||||
class Task
|
||||
{
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
private $id;
|
||||
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
private $function_name;
|
||||
|
||||
/**
|
||||
* @var string|Closure|null
|
||||
*/
|
||||
private $data;
|
||||
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
private $priority;
|
||||
|
||||
/**
|
||||
* @var Closure|null
|
||||
*/
|
||||
private $callback;
|
||||
|
||||
/**
|
||||
* @var bool
|
||||
*/
|
||||
private $closure;
|
||||
|
||||
/**
|
||||
* Public Constructor
|
||||
*
|
||||
* @param string $function_name
|
||||
* @param string|Closure|null $data
|
||||
* @param Closure|null $callback
|
||||
*/
|
||||
public function __construct(string $function_name, string|Closure|null $data, Closure $callback=null)
|
||||
{
|
||||
$this->function_name = $function_name;
|
||||
$this->data = $data;
|
||||
$this->id = uniqid();
|
||||
$this->priority = TaskPriority::Normal;
|
||||
$this->callback = $callback;
|
||||
$this->closure = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Static Constructor
|
||||
*
|
||||
* @param string $function_name
|
||||
* @param string|Closure|null $data
|
||||
* @param callable|null $callback
|
||||
* @return static
|
||||
*/
|
||||
public static function create(string $function_name, string|Closure|null $data, callable $callback=null): self
|
||||
{
|
||||
return new self($function_name, $data, $callback);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the function name for the task
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
public function getFunctionName(): string
|
||||
{
|
||||
return $this->function_name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the function name for the task
|
||||
*
|
||||
* @param string $function_name
|
||||
* @return Task
|
||||
*/
|
||||
public function setFunctionName(string $function_name): self
|
||||
{
|
||||
$this->function_name = $function_name;
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the arguments for the task
|
||||
*
|
||||
* @return string|Closure|null
|
||||
*/
|
||||
public function getData(): string|null|Closure
|
||||
{
|
||||
return $this->data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the arguments for the task
|
||||
*
|
||||
* @param string $data
|
||||
* @return Task
|
||||
*/
|
||||
public function setData(string $data): self
|
||||
{
|
||||
$this->data = $data;
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Unique ID of the task
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
public function getId(): string
|
||||
{
|
||||
return $this->id;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int
|
||||
*/
|
||||
public function getPriority(): int
|
||||
{
|
||||
return $this->priority;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int $priority
|
||||
* @return Task
|
||||
*/
|
||||
public function setPriority(int $priority): self
|
||||
{
|
||||
if(!Validate::taskPriority($priority))
|
||||
{
|
||||
throw new InvalidArgumentException("Invalid priority value");
|
||||
}
|
||||
|
||||
$this->priority = $priority;
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Closure|null $callback
|
||||
* @return Task
|
||||
*/
|
||||
public function setCallback(?Closure $callback): self
|
||||
{
|
||||
$this->callback = $callback;
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the callback function
|
||||
*
|
||||
* @param string|JobResults|null $result
|
||||
* @return void
|
||||
*/
|
||||
public function runCallback(string|JobResults|null $result): void
|
||||
{
|
||||
if($this->callback !== null)
|
||||
{
|
||||
call_user_func($this->callback, $result);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return bool
|
||||
*/
|
||||
public function isClosure(): bool
|
||||
{
|
||||
return $this->closure;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bool $closure
|
||||
* @return Task
|
||||
*/
|
||||
public function setClosure(bool $closure): self
|
||||
{
|
||||
$this->closure = $closure;
|
||||
return $this;
|
||||
}
|
||||
}
|
|
@ -1,186 +0,0 @@
|
|||
<?php
|
||||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace TamerLib\Objects;
|
||||
|
||||
use Exception;
|
||||
use LogLib\Log;
|
||||
use Symfony\Component\Process\Process;
|
||||
use TamerLib\Classes\Functions;
|
||||
|
||||
class WorkerInstance
|
||||
{
|
||||
/**
|
||||
* The worker's instance id
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
private $id;
|
||||
|
||||
/**
|
||||
* The protocol to use when connecting to the server
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
private $protocol;
|
||||
|
||||
/**
|
||||
* The servers to connect to
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $servers;
|
||||
|
||||
/**
|
||||
* The username to use when connecting to the server (if applicable)
|
||||
*
|
||||
* @var string|null
|
||||
*/
|
||||
private $username;
|
||||
|
||||
/**
|
||||
* The password to use when connecting to the server (if applicable)
|
||||
*
|
||||
* @var string|null
|
||||
*/
|
||||
private $password;
|
||||
|
||||
/**
|
||||
* The process that is running the worker instance
|
||||
*
|
||||
* @var Process|null
|
||||
*/
|
||||
private $process;
|
||||
|
||||
/**
|
||||
* The target to run the worker instance on (e.g. a file path)
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
private $target;
|
||||
|
||||
/**
|
||||
* Public Constructor
|
||||
*
|
||||
* @param string $target
|
||||
* @param string $protocol
|
||||
* @param array $servers
|
||||
* @param string|null $username
|
||||
* @param string|null $password
|
||||
* @throws Exception
|
||||
*/
|
||||
public function __construct(string $target, string $protocol, array $servers, ?string $username = null, ?string $password = null)
|
||||
{
|
||||
$this->id = uniqid();
|
||||
$this->target = $target;
|
||||
$this->protocol = $protocol;
|
||||
$this->servers = $servers;
|
||||
$this->username = $username;
|
||||
$this->password = $password;
|
||||
$this->process = null;
|
||||
|
||||
if($target !== 'closure' && file_exists($target) === false)
|
||||
{
|
||||
throw new Exception('The target file does not exist');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the worker instance id
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
public function getId(): string
|
||||
{
|
||||
return $this->id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the worker instance in a separate process
|
||||
*
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function start(): void
|
||||
{
|
||||
$target = $this->target;
|
||||
if($target == 'closure')
|
||||
{
|
||||
$target = __DIR__ . DIRECTORY_SEPARATOR . 'closure';
|
||||
}
|
||||
|
||||
$argv = $_SERVER['argv'];
|
||||
array_shift($argv);
|
||||
|
||||
$this->process = new Process(array_merge([Functions::findPhpBin(), $target], $argv));
|
||||
$this->process->setEnv([
|
||||
'TAMER_ENABLED' => 'true',
|
||||
'TAMER_PROTOCOL' => $this->protocol,
|
||||
'TAMER_SERVERS' => implode(',', $this->servers),
|
||||
'TAMER_USERNAME' => $this->username,
|
||||
'TAMER_PASSWORD' => $this->password,
|
||||
'TAMER_INSTANCE_ID' => $this->id
|
||||
]);
|
||||
|
||||
|
||||
Log::debug('net.nosial.tamerlib', sprintf('starting worker %s', $this->id));
|
||||
|
||||
// Callback for process output
|
||||
$this->process->start(function ($type, $buffer)
|
||||
{
|
||||
// Add newline if it's missing
|
||||
if(substr($buffer, -1) !== PHP_EOL)
|
||||
{
|
||||
$buffer .= PHP_EOL;
|
||||
}
|
||||
|
||||
print($buffer);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the worker instance
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function stop(): void
|
||||
{
|
||||
if($this->process !== null)
|
||||
{
|
||||
Log::debug('net.nosial.tamerlib', sprintf('Stopping worker %s', $this->id));
|
||||
$this->process->stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the worker instance is running
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isRunning(): bool
|
||||
{
|
||||
if($this->process !== null)
|
||||
{
|
||||
return $this->process->isRunning();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Process|null
|
||||
*/
|
||||
public function getProcess(): ?Process
|
||||
{
|
||||
return $this->process;
|
||||
}
|
||||
|
||||
/**
|
||||
* Destructor
|
||||
*/
|
||||
public function __destruct()
|
||||
{
|
||||
$this->stop();
|
||||
}
|
||||
}
|
|
@ -1,16 +0,0 @@
|
|||
<?php
|
||||
|
||||
require 'ncc';
|
||||
import('net.nosial.tamerlib', 'latest');
|
||||
|
||||
\TamerLib\Tamer::initWorker();
|
||||
|
||||
try
|
||||
{
|
||||
\TamerLib\Tamer::work();
|
||||
}
|
||||
catch(\Exception $e)
|
||||
{
|
||||
\LogLib\Log::error('net.nosial.tamerlib', $e->getMessage(), $e);
|
||||
exit(1);
|
||||
}
|
|
@ -1,468 +0,0 @@
|
|||
<?php
|
||||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace TamerLib\Protocols\Gearman;
|
||||
|
||||
use Closure;
|
||||
use Exception;
|
||||
use GearmanClient;
|
||||
use GearmanTask;
|
||||
use LogLib\Log;
|
||||
use TamerLib\Abstracts\TaskPriority;
|
||||
use TamerLib\Exceptions\ConnectionException;
|
||||
use TamerLib\Interfaces\ClientProtocolInterface;
|
||||
use TamerLib\Objects\Job;
|
||||
use TamerLib\Objects\JobResults;
|
||||
use TamerLib\Objects\Task;
|
||||
|
||||
class Client implements ClientProtocolInterface
|
||||
{
|
||||
/**
|
||||
* The Gearman Client object
|
||||
*
|
||||
* @var GearmanClient|null $client
|
||||
*/
|
||||
private $client;
|
||||
|
||||
/**
|
||||
* An array of servers that have been defined
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $defined_servers;
|
||||
|
||||
/**
|
||||
* Used for tracking the current execution of tasks and run callbacks on completion
|
||||
*
|
||||
* @var Task[]
|
||||
*/
|
||||
private $tasks;
|
||||
|
||||
/**
|
||||
* Indicates if the client should automatically reconnect to the server if the connection is lost
|
||||
* (default: true)
|
||||
*
|
||||
* @var bool
|
||||
*/
|
||||
private $automatic_reconnect;
|
||||
|
||||
/**
|
||||
* The Unix timestamp of the next time the client should attempt to reconnect to the server
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $next_reconnect;
|
||||
|
||||
/**
|
||||
* The options to use when connecting to the server
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $options;
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
* @param string|null $username
|
||||
* @param string|null $password
|
||||
*/
|
||||
public function __construct(?string $username=null, ?string $password=null)
|
||||
{
|
||||
$this->client = null;
|
||||
$this->tasks = [];
|
||||
$this->automatic_reconnect = false;
|
||||
$this->next_reconnect = time() + 1800;
|
||||
$this->defined_servers = [];
|
||||
$this->options = [];
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Adds a server to the list of servers to use
|
||||
*
|
||||
* @link http://php.net/manual/en/gearmanclient.addserver.php
|
||||
* @param string $host (127.0.0.1)
|
||||
* @param int $port (default: 4730)
|
||||
* @return void
|
||||
*/
|
||||
public function addServer(string $host, int $port): void
|
||||
{
|
||||
if(!isset($this->defined_servers[$host]))
|
||||
{
|
||||
$this->defined_servers[$host] = [];
|
||||
}
|
||||
|
||||
if(in_array($port, $this->defined_servers[$host]))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
$this->defined_servers[$host][] = $port;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a list of servers to the list of servers to use
|
||||
*
|
||||
* @link http://php.net/manual/en/gearmanclient.addservers.php
|
||||
* @param array $servers (host:port, host:port, ...)
|
||||
* @return void
|
||||
*/
|
||||
public function addServers(array $servers): void
|
||||
{
|
||||
foreach($servers as $server)
|
||||
{
|
||||
$server = explode(':', $server);
|
||||
$this->addServer($server[0], (int)$server[1]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects to the server(s)
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function connect(): void
|
||||
{
|
||||
if($this->isConnected())
|
||||
return;
|
||||
|
||||
$this->client = new GearmanClient();
|
||||
|
||||
// Parse $options combination via bitwise OR operator
|
||||
$options = array_reduce($this->options, function($carry, $item)
|
||||
{
|
||||
return $carry | $item;
|
||||
});
|
||||
|
||||
$this->client->addOptions($options);
|
||||
|
||||
foreach($this->defined_servers as $host => $ports)
|
||||
{
|
||||
foreach($ports as $port)
|
||||
{
|
||||
try
|
||||
{
|
||||
$this->client->addServer($host, $port);
|
||||
Log::debug('net.nosial.tamerlib', 'connected to gearman server: ' . $host . ':' . $port);
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
throw new ConnectionException('Failed to connect to Gearman server: ' . $host . ':' . $port, 0, $e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$this->client->setCompleteCallback([$this, 'callbackHandler']);
|
||||
$this->client->setFailCallback([$this, 'callbackHandler']);
|
||||
$this->client->setDataCallback([$this, 'callbackHandler']);
|
||||
$this->client->setStatusCallback([$this, 'callbackHandler']);
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnects from the server(s)
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function disconnect(): void
|
||||
{
|
||||
if(!$this->isConnected())
|
||||
return;
|
||||
|
||||
Log::debug('net.nosial.tamerlib', 'disconnecting from gearman server(s)');
|
||||
$this->client->clearCallbacks();
|
||||
unset($this->client);
|
||||
$this->client = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnects to the server(s)
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function reconnect(): void
|
||||
{
|
||||
Log::debug('net.nosial.tamerlib', 'reconnecting to gearman server(s)');
|
||||
|
||||
$this->disconnect();
|
||||
$this->connect();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current status of the client
|
||||
*
|
||||
* @inheritDoc
|
||||
* @return bool
|
||||
*/
|
||||
public function isConnected(): bool
|
||||
{
|
||||
if($this->client === null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* The automatic reconnect process
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function preformAutoreconf(): void
|
||||
{
|
||||
if($this->automatic_reconnect && $this->next_reconnect < time())
|
||||
{
|
||||
try
|
||||
{
|
||||
$this->reconnect();
|
||||
}
|
||||
catch (Exception $e)
|
||||
{
|
||||
Log::error('net.nosial.tamerlib', 'Failed to reconnect to Gearman server: ' . $e->getMessage());
|
||||
}
|
||||
finally
|
||||
{
|
||||
$this->next_reconnect = time() + 1800;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds client options
|
||||
*
|
||||
* @link http://php.net/manual/en/gearmanclient.addoptions.php
|
||||
* @param int[] $options (GEARMAN_CLIENT_NON_BLOCKING, GEARMAN_CLIENT_UNBUFFERED_RESULT, GEARMAN_CLIENT_FREE_TASKS)
|
||||
* @return void
|
||||
*/
|
||||
public function setOptions(array $options): void
|
||||
{
|
||||
$this->options = $options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current client options
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function getOptions(): array
|
||||
{
|
||||
return $this->options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the current client options
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function clearOptions(): void
|
||||
{
|
||||
$this->options = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a closure in the background
|
||||
*
|
||||
* @param Closure $closure
|
||||
* @return void
|
||||
*/
|
||||
public function doClosure(Closure $closure): void
|
||||
{
|
||||
$closure_task = new Task('tamer_closure', $closure);
|
||||
$closure_task->setClosure(true);
|
||||
$this->do($closure_task);
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes a task in the background
|
||||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
*/
|
||||
public function do(Task $task): void
|
||||
{
|
||||
$this->preformAutoreconf();
|
||||
|
||||
$this->tasks[] = $task;
|
||||
$job = new Job($task);
|
||||
$job_data = msgpack_pack($job->toArray());
|
||||
|
||||
Log::debug('net.nosial.tamerlib', 'sending closure to gearman server: ' . strlen($job_data) . ' bytes');
|
||||
switch($task->getPriority())
|
||||
{
|
||||
case TaskPriority::High:
|
||||
$this->client->doHighBackground($task->getFunctionName(), $job_data);
|
||||
break;
|
||||
|
||||
case TaskPriority::Low:
|
||||
$this->client->doLowBackground($task->getFunctionName(), $job_data);
|
||||
break;
|
||||
|
||||
default:
|
||||
case TaskPriority::Normal:
|
||||
$this->client->doBackground($task->getFunctionName(), $job_data);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a task to the list of tasks to run
|
||||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
*/
|
||||
public function queue(Task $task): void
|
||||
{
|
||||
$this->preformAutoreconf();
|
||||
|
||||
$this->tasks[] = $task;
|
||||
$job = new Job($task);
|
||||
$job_data = msgpack_pack($job->toArray());
|
||||
|
||||
Log::debug('net.nosial.tamerlib', 'sending closure to gearman server: ' . strlen($job_data) . ' bytes');
|
||||
switch($task->getPriority())
|
||||
{
|
||||
case TaskPriority::High:
|
||||
$this->client->addTaskHigh($task->getFunctionName(), $job_data);
|
||||
break;
|
||||
|
||||
case TaskPriority::Low:
|
||||
$this->client->addTaskLow($task->getFunctionName(), $job_data);
|
||||
break;
|
||||
|
||||
default:
|
||||
case TaskPriority::Normal:
|
||||
$this->client->addTask($task->getFunctionName(), $job_data);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a closure task to the list of tasks to run
|
||||
*
|
||||
* @param Closure $closure
|
||||
* @param Closure|null $callback
|
||||
* @return void
|
||||
*/
|
||||
public function queueClosure(Closure $closure, ?Closure $callback=null): void
|
||||
{
|
||||
$closure_task = new Task('tamer_closure', $closure, $callback);
|
||||
$closure_task->setClosure(true);
|
||||
$this->queue($closure_task);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return bool
|
||||
*/
|
||||
public function run(): bool
|
||||
{
|
||||
if(!$this->isConnected())
|
||||
return false;
|
||||
|
||||
$this->preformAutoreconf();
|
||||
|
||||
if(!$this->client->runTasks())
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes a task callback in the foreground
|
||||
*
|
||||
* @param GearmanTask $task
|
||||
* @return void
|
||||
*/
|
||||
public function callbackHandler(GearmanTask $task): void
|
||||
{
|
||||
$job_result = JobResults::fromArray(msgpack_unpack($task->data()));
|
||||
$internal_task = $this->getTaskById($job_result->getId());
|
||||
|
||||
Log::debug('net.nosial.tamerlib', 'callback for task ' . $internal_task->getId() . ' with status ' . $job_result->getStatus() . ' and data size ' . strlen($task->data()) . ' bytes');
|
||||
|
||||
try
|
||||
{
|
||||
if($internal_task->isClosure())
|
||||
{
|
||||
// If the task is a closure, we need to run the callback with the closure's return value
|
||||
// instead of the job result object
|
||||
$internal_task->runCallback($job_result->getData());
|
||||
return;
|
||||
}
|
||||
|
||||
$internal_task->runCallback($job_result);
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
Log::error('net.nosial.tamerlib', 'Failed to run callback for task ' . $internal_task->getId() . ': ' . $e->getMessage(), $e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
$this->removeTask($internal_task);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $id
|
||||
* @return Task|null
|
||||
*/
|
||||
private function getTaskById(string $id): ?Task
|
||||
{
|
||||
foreach($this->tasks as $task)
|
||||
{
|
||||
if($task->getId() === $id)
|
||||
{
|
||||
return $task;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a task from the list of tasks
|
||||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
*/
|
||||
private function removeTask(Task $task): void
|
||||
{
|
||||
$this->tasks = array_filter($this->tasks, function($item) use ($task)
|
||||
{
|
||||
return $item->getId() !== $task->getId();
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @return bool
|
||||
*/
|
||||
public function automaticReconnectionEnabled(): bool
|
||||
{
|
||||
return $this->automatic_reconnect;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bool $enable
|
||||
*/
|
||||
public function enableAutomaticReconnection(bool $enable): void
|
||||
{
|
||||
$this->automatic_reconnect = $enable;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes all remaining tasks and closes the connection
|
||||
*/
|
||||
public function __destruct()
|
||||
{
|
||||
try
|
||||
{
|
||||
$this->disconnect();
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,371 +0,0 @@
|
|||
<?php
|
||||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace TamerLib\Protocols\Gearman;
|
||||
|
||||
use Exception;
|
||||
use GearmanJob;
|
||||
use GearmanWorker;
|
||||
use LogLib\Log;
|
||||
use Opis\Closure\SerializableClosure;
|
||||
use TamerLib\Abstracts\JobStatus;
|
||||
use TamerLib\Exceptions\ConnectionException;
|
||||
use TamerLib\Interfaces\WorkerProtocolInterface;
|
||||
use TamerLib\Objects\Job;
|
||||
use TamerLib\Objects\JobResults;
|
||||
|
||||
class Worker implements WorkerProtocolInterface
|
||||
{
|
||||
/**
|
||||
* The Gearman Worker Instance (if connected)
|
||||
*
|
||||
* @var GearmanWorker|null
|
||||
*/
|
||||
private $worker;
|
||||
|
||||
/**
|
||||
* The list of servers that have been added
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $defined_servers;
|
||||
|
||||
/**
|
||||
* Indicates if the worker should automatically reconnect to the server
|
||||
*
|
||||
* @var bool
|
||||
*/
|
||||
private $automatic_reconnect;
|
||||
|
||||
/**
|
||||
* The Unix Timestamp of when the next reconnect should occur (if automatic_reconnect is true)
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $next_reconnect;
|
||||
|
||||
/**
|
||||
* The options to use when connecting to the server
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $options;
|
||||
|
||||
/**
|
||||
* Public Constructor with optional username and password
|
||||
*
|
||||
* @param string|null $username
|
||||
* @param string|null $password
|
||||
*/
|
||||
public function __construct(?string $username=null, ?string $password=null)
|
||||
{
|
||||
$this->worker = null;
|
||||
$this->defined_servers = [];
|
||||
$this->automatic_reconnect = false;
|
||||
$this->next_reconnect = time() + 1800;
|
||||
$this->options = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a server to the list of servers to use
|
||||
*
|
||||
* @link http://php.net/manual/en/gearmanworker.addserver.php
|
||||
* @param string $host (
|
||||
* @param int $port (default: 4730)
|
||||
* @return void
|
||||
*/
|
||||
public function addServer(string $host, int $port): void
|
||||
{
|
||||
if(!isset($this->defined_servers[$host]))
|
||||
{
|
||||
$this->defined_servers[$host] = [];
|
||||
}
|
||||
|
||||
if(in_array($port, $this->defined_servers[$host]))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
$this->defined_servers[$host][] = $port;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a list of servers to the list of servers to use
|
||||
*
|
||||
* @link http://php.net/manual/en/gearmanworker.addservers.php
|
||||
* @param string[] $servers (host:port, host:port, ...)
|
||||
* @return void
|
||||
*/
|
||||
public function addServers(array $servers): void
|
||||
{
|
||||
foreach($servers as $server)
|
||||
{
|
||||
$server = explode(':', $server);
|
||||
$this->addServer($server[0], (int)$server[1]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects to the server
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function connect(): void
|
||||
{
|
||||
if($this->isConnected())
|
||||
return;
|
||||
|
||||
$this->worker = new GearmanWorker();
|
||||
$this->worker->addOptions(GEARMAN_WORKER_GRAB_UNIQ);
|
||||
|
||||
foreach($this->defined_servers as $host => $ports)
|
||||
{
|
||||
foreach($ports as $port)
|
||||
{
|
||||
try
|
||||
{
|
||||
$this->worker->addServer($host, $port);
|
||||
Log::debug('net.nosial.tamerlib', 'connected to gearman server: ' . $host . ':' . $port);
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
throw new ConnectionException('Failed to connect to Gearman server: ' . $host . ':' . $port, 0, $e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$this->worker->addFunction('tamer_closure', function(GearmanJob $job)
|
||||
{
|
||||
$received_job = Job::fromArray(msgpack_unpack($job->workload()));
|
||||
Log::debug('net.nosial.tamerlib', 'received closure: ' . $received_job->getId());
|
||||
|
||||
try
|
||||
{
|
||||
/** @var SerializableClosure $closure */
|
||||
$closure = $received_job->getData();
|
||||
$result = $closure($received_job);
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
$job->sendFail();
|
||||
unset($e);
|
||||
return;
|
||||
}
|
||||
|
||||
$job_results = new JobResults($received_job, JobStatus::Success, $result);
|
||||
$job->sendComplete(msgpack_pack($job_results->toArray()));
|
||||
Log::debug('net.nosial.tamerlib', 'completed closure: ' . $received_job->getId());
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnects from the server
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function disconnect(): void
|
||||
{
|
||||
if(!$this->isConnected())
|
||||
return;
|
||||
|
||||
$this->worker->unregisterAll();
|
||||
unset($this->worker);
|
||||
$this->worker = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnects to the server if the connection has been lost
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function reconnect(): void
|
||||
{
|
||||
$this->disconnect();
|
||||
$this->connect();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the worker is connected to the server
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isConnected(): bool
|
||||
{
|
||||
return $this->worker !== null;
|
||||
}
|
||||
|
||||
/**
|
||||
* The automatic reconnect process
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function preformAutoreconf(): void
|
||||
{
|
||||
if($this->automatic_reconnect && $this->next_reconnect < time())
|
||||
{
|
||||
try
|
||||
{
|
||||
$this->reconnect();
|
||||
}
|
||||
catch (Exception $e)
|
||||
{
|
||||
Log::error('net.nosial.tamerlib', 'Failed to reconnect to Gearman server: ' . $e->getMessage());
|
||||
}
|
||||
finally
|
||||
{
|
||||
$this->next_reconnect = time() + 1800;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the options to use when connecting to the server
|
||||
*
|
||||
* @param array $options
|
||||
* @return bool
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function setOptions(array $options): void
|
||||
{
|
||||
$this->options = $options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the options to use when connecting to the server
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function getOptions(): array
|
||||
{
|
||||
return $this->options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the options to use when connecting to the server
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function clearOptions(): void
|
||||
{
|
||||
$this->options = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* @return bool
|
||||
*/
|
||||
public function automaticReconnectionEnabled(): bool
|
||||
{
|
||||
return $this->automatic_reconnect;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bool $enable
|
||||
* @return void
|
||||
*/
|
||||
public function enableAutomaticReconnection(bool $enable): void
|
||||
{
|
||||
$this->automatic_reconnect = $enable;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a function to the list of functions to call
|
||||
*
|
||||
* @link http://php.net/manual/en/gearmanworker.addfunction.php
|
||||
* @param string $name The name of the function to register with the job server
|
||||
* @param callable $callable The callback function to call when the job is received
|
||||
* @return void
|
||||
*/
|
||||
public function addFunction(string $name, callable $callable): void
|
||||
{
|
||||
$this->worker->addFunction($name, function(GearmanJob $job) use ($callable)
|
||||
{
|
||||
$received_job = Job::fromArray(msgpack_unpack($job->workload()));
|
||||
Log::debug('net.nosial.tamerlib', 'received job: ' . $received_job->getId());
|
||||
|
||||
try
|
||||
{
|
||||
$result = $callable($received_job);
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
$job->sendFail();
|
||||
unset($e);
|
||||
return;
|
||||
}
|
||||
|
||||
$job_results = new JobResults($received_job, JobStatus::Success, $result);
|
||||
$job->sendComplete(msgpack_pack($job_results->toArray()));
|
||||
Log::debug('net.nosial.tamerlib', 'completed job: ' . $received_job->getId());
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a function from the list of functions to call
|
||||
*
|
||||
* @param string $function_name The name of the function to unregister
|
||||
* @return void
|
||||
*/
|
||||
public function removeFunction(string $function_name): void
|
||||
{
|
||||
$this->worker->unregister($function_name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for a job and calls the appropriate callback function
|
||||
*
|
||||
* @link http://php.net/manual/en/gearmanworker.work.php
|
||||
* @param bool $blocking (default: true) Whether to block until a job is received
|
||||
* @param int $timeout (default: 500) The timeout in milliseconds (if $blocking is false)
|
||||
* @param bool $throw_errors (default: false) Whether to throw exceptions on errors
|
||||
* @return void Returns nothing
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function work(bool $blocking=true, int $timeout=500, bool $throw_errors=false): void
|
||||
{
|
||||
$this->worker->setTimeout($timeout);
|
||||
|
||||
while(true)
|
||||
{
|
||||
@$this->preformAutoreconf();
|
||||
@$this->worker->work();
|
||||
|
||||
if($this->worker->returnCode() == GEARMAN_COULD_NOT_CONNECT)
|
||||
{
|
||||
throw new ConnectionException('Could not connect to Gearman server');
|
||||
}
|
||||
|
||||
if($this->worker->returnCode() == GEARMAN_TIMEOUT && !$blocking)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
if($this->worker->returnCode() != GEARMAN_SUCCESS && $throw_errors)
|
||||
{
|
||||
Log::error('net.nosial.tamerlib', 'Gearman worker error: ' . $this->worker->error());
|
||||
}
|
||||
|
||||
if($blocking)
|
||||
{
|
||||
usleep($timeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes all remaining tasks and closes the connection
|
||||
*/
|
||||
public function __destruct()
|
||||
{
|
||||
try
|
||||
{
|
||||
$this->disconnect();
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,467 +0,0 @@
|
|||
<?php
|
||||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace TamerLib\Protocols\RabbitMq;
|
||||
|
||||
use Closure;
|
||||
use Exception;
|
||||
use PhpAmqpLib\Message\AMQPMessage;
|
||||
use TamerLib\Abstracts\ObjectType;
|
||||
use TamerLib\Classes\Functions;
|
||||
use TamerLib\Classes\Validate;
|
||||
use TamerLib\Exceptions\ConnectionException;
|
||||
use TamerLib\Interfaces\ClientProtocolInterface;
|
||||
use TamerLib\Objects\Job;
|
||||
use TamerLib\Objects\JobResults;
|
||||
use TamerLib\Objects\Task;
|
||||
|
||||
class Client implements ClientProtocolInterface
|
||||
{
|
||||
/**
|
||||
* An array of servers to use
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $defined_servers;
|
||||
|
||||
/**
|
||||
* Used for tracking the current execution of tasks and run callbacks on completion
|
||||
*
|
||||
* @var Task[]
|
||||
*/
|
||||
private $tasks;
|
||||
|
||||
/**
|
||||
* Whether to automatically reconnect to the server if the connection is lost
|
||||
*
|
||||
* @var bool
|
||||
*/
|
||||
private $automatic_reconnect;
|
||||
|
||||
/**
|
||||
* (Optional) The username to use when connecting to the server
|
||||
*
|
||||
* @var string|null
|
||||
*/
|
||||
private $username;
|
||||
|
||||
/**
|
||||
* (Optional) The password to use when connecting to the server
|
||||
*
|
||||
* @var string|null
|
||||
*/
|
||||
private $password;
|
||||
|
||||
/**
|
||||
* (Optional) An array of options to use when connecting to the server
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $options;
|
||||
|
||||
/**
|
||||
* An array of connections to use
|
||||
*
|
||||
* @var Connection[]
|
||||
*/
|
||||
private $connections;
|
||||
|
||||
/**
|
||||
* Public Constructor
|
||||
*
|
||||
* @param string|null $username
|
||||
* @param string|null $password
|
||||
*/
|
||||
public function __construct(?string $username=null, ?string $password=null)
|
||||
{
|
||||
$this->tasks = [];
|
||||
$this->automatic_reconnect = false;
|
||||
$this->defined_servers = [];
|
||||
$this->options = [];
|
||||
$this->username = $username;
|
||||
$this->password = $password;
|
||||
$this->connections = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a server to the list of servers to use
|
||||
*
|
||||
* @param string $host
|
||||
* @param int $port
|
||||
* @return void
|
||||
*/
|
||||
public function addServer(string $host, int $port): void
|
||||
{
|
||||
if(!isset($this->defined_servers[$host]))
|
||||
{
|
||||
$this->defined_servers[$host] = [];
|
||||
}
|
||||
|
||||
if(in_array($port, $this->defined_servers[$host]))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
$this->defined_servers[$host][] = $port;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a list of servers to the list of servers to use
|
||||
*
|
||||
* @param array $servers (host:port, host:port, ...)
|
||||
* @return void
|
||||
*/
|
||||
public function addServers(array $servers): void
|
||||
{
|
||||
foreach($servers as $server)
|
||||
{
|
||||
$server = explode(':', $server);
|
||||
$this->addServer($server[0], (int)$server[1]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects to the server(s) defined
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function connect(): void
|
||||
{
|
||||
if($this->isConnected())
|
||||
return;
|
||||
|
||||
if(count($this->defined_servers) === 0)
|
||||
return;
|
||||
|
||||
foreach($this->defined_servers as $host => $ports)
|
||||
{
|
||||
foreach($ports as $port)
|
||||
{
|
||||
$connection = new Connection($host, $port, $this->username, $this->password);
|
||||
$connection->connect();
|
||||
|
||||
$this->connections[] = $connection;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnects from the server
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function disconnect(): void
|
||||
{
|
||||
if(!$this->isConnected())
|
||||
return;
|
||||
|
||||
foreach($this->connections as $connection)
|
||||
{
|
||||
$connection->disconnect();
|
||||
}
|
||||
|
||||
$this->connections = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnects to the server
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function reconnect(): void
|
||||
{
|
||||
$this->disconnect();
|
||||
$this->connect();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns True if one or more connections are connected, False otherwise
|
||||
* (Note, some connections may be disconnected, and this will still return True)
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isConnected(): bool
|
||||
{
|
||||
if(count($this->connections) === 0)
|
||||
return false;
|
||||
|
||||
foreach($this->connections as $connection)
|
||||
{
|
||||
if($connection->isConnected())
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the options array
|
||||
*
|
||||
* @param array $options
|
||||
* @return void
|
||||
*/
|
||||
public function setOptions(array $options): void
|
||||
{
|
||||
$this->options = $options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the options array
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function getOptions(): array
|
||||
{
|
||||
return $this->options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the options array
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function clearOptions(): void
|
||||
{
|
||||
$this->options = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns True if the client is automatically reconnecting to the server
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function automaticReconnectionEnabled(): bool
|
||||
{
|
||||
return $this->automatic_reconnect;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enables or disables automatic reconnecting to the server
|
||||
*
|
||||
* @param bool $enable
|
||||
* @return void
|
||||
*/
|
||||
public function enableAutomaticReconnection(bool $enable): void
|
||||
{
|
||||
$this->automatic_reconnect = $enable;
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs a task in the background (Fire and Forget)
|
||||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
*/
|
||||
public function do(Task $task): void
|
||||
{
|
||||
if(!$this->isConnected())
|
||||
return;
|
||||
|
||||
$job = new Job($task);
|
||||
$message = new AMQPMessage(msgpack_pack($job->toArray()), [
|
||||
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
|
||||
'correlation_id' => $task->getId(),
|
||||
'priority' => Functions::calculatePriority($task->getPriority()),
|
||||
]);
|
||||
|
||||
// Select random connection
|
||||
$connection = $this->connections[array_rand($this->connections)];
|
||||
if($this->automatic_reconnect)
|
||||
$connection->preformAutoreconf();
|
||||
$connection->getChannel()->basic_publish($message, '', 'tamer_queue');
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a closure in the background
|
||||
*
|
||||
* @param Closure $closure
|
||||
* @return void
|
||||
*/
|
||||
public function doClosure(Closure $closure): void
|
||||
{
|
||||
$closure_task = new Task('tamer_closure', $closure);
|
||||
$closure_task->setClosure(true);
|
||||
$this->do($closure_task);
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues a task to be executed
|
||||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
*/
|
||||
public function queue(Task $task): void
|
||||
{
|
||||
$this->tasks[] = $task;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a closure task to the list of tasks to run
|
||||
*
|
||||
* @param Closure $closure
|
||||
* @param Closure|null $callback
|
||||
* @return void
|
||||
*/
|
||||
public function queueClosure(Closure $closure, ?Closure $callback=null): void
|
||||
{
|
||||
$closure_task = new Task('tamer_closure', $closure, $callback);
|
||||
$closure_task->setClosure(true);
|
||||
$this->queue($closure_task);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes all the tasks that has been added
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function run(): bool
|
||||
{
|
||||
if(count($this->tasks) === 0)
|
||||
return false;
|
||||
|
||||
if(!$this->isConnected())
|
||||
return false;
|
||||
|
||||
$this->preformAutoreconf();
|
||||
$correlationIds = [];
|
||||
$connection = $this->connections[array_rand($this->connections)];
|
||||
|
||||
if($this->automatic_reconnect)
|
||||
$connection->preformAutoreconf();
|
||||
|
||||
/** @var Task $task */
|
||||
foreach($this->tasks as $task)
|
||||
{
|
||||
$correlationIds[] = $task->getId();
|
||||
$job = new Job($task);
|
||||
|
||||
$message = new AMQPMessage(msgpack_pack($job->toArray()), [
|
||||
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
|
||||
'correlation_id' => $task->getId(),
|
||||
'reply_to' => 'tamer_queue',
|
||||
'priority' => Functions::calculatePriority($task->getPriority()),
|
||||
]);
|
||||
|
||||
$connection->getChannel()->basic_publish($message, '', 'tamer_queue');
|
||||
}
|
||||
|
||||
// Register callback for each task
|
||||
$callback = function($msg) use (&$correlationIds, $connection)
|
||||
{
|
||||
var_dump(Validate::getObjectType(msgpack_unpack($msg->body)));
|
||||
if(Validate::getObjectType(msgpack_unpack($msg->body)) !== ObjectType::JobResults)
|
||||
{
|
||||
$connection->getChannel()->basic_nack($msg->delivery_info['delivery_tag'], false, true);
|
||||
return;
|
||||
}
|
||||
|
||||
$job_result = JobResults::fromArray(msgpack_unpack($msg->body));
|
||||
$task = $this->getTaskById($job_result->getId());
|
||||
|
||||
if($task == null)
|
||||
{
|
||||
$connection->getChannel()->basic_nack($msg->delivery_info['delivery_tag'], false, true);
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if($task->isClosure())
|
||||
{
|
||||
$task->runCallback($job_result->getData());
|
||||
}
|
||||
else
|
||||
{
|
||||
$task->runCallback($job_result);
|
||||
}
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
echo $e->getMessage();
|
||||
}
|
||||
|
||||
// Remove the processed correlation_id
|
||||
$index = array_search($msg->get('correlation_id'), $correlationIds);
|
||||
|
||||
if ($index !== false)
|
||||
{
|
||||
unset($correlationIds[$index]);
|
||||
$connection->getChannel()->basic_ack($msg->delivery_info['delivery_tag']);
|
||||
}
|
||||
else
|
||||
{
|
||||
$connection->getChannel()->basic_nack($msg->delivery_info['delivery_tag'], false, true);
|
||||
}
|
||||
|
||||
// Stop consuming when all tasks are processed
|
||||
if(count($correlationIds) === 0)
|
||||
{
|
||||
$connection->getChannel()->basic_cancel($msg->delivery_info['consumer_tag']);
|
||||
}
|
||||
};
|
||||
|
||||
$connection->getChannel()->basic_consume('tamer_queue', '', false, false, false, false, $callback);
|
||||
|
||||
// Start consuming messages
|
||||
while(count($connection->getChannel()->callbacks))
|
||||
{
|
||||
$connection->getChannel()->wait();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a task by its id
|
||||
*
|
||||
* @param string $id
|
||||
* @return Task|null
|
||||
*/
|
||||
private function getTaskById(string $id): ?Task
|
||||
{
|
||||
foreach($this->tasks as $task)
|
||||
{
|
||||
if($task->getId() === $id)
|
||||
{
|
||||
return $task;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* The automatic reconnect process
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function preformAutoreconf(): void
|
||||
{
|
||||
if($this->automatic_reconnect)
|
||||
{
|
||||
foreach($this->connections as $connection)
|
||||
{
|
||||
$connection->preformAutoreconf();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnects from the server when the object is destroyed
|
||||
*/
|
||||
public function __destruct()
|
||||
{
|
||||
try
|
||||
{
|
||||
$this->disconnect();
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,216 +0,0 @@
|
|||
<?php
|
||||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace TamerLib\Protocols\RabbitMq;
|
||||
|
||||
use Exception;
|
||||
use LogLib\Log;
|
||||
use PhpAmqpLib\Channel\AMQPChannel;
|
||||
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
||||
use TamerLib\Exceptions\ConnectionException;
|
||||
|
||||
class Connection
|
||||
{
|
||||
/**
|
||||
* The unique ID of the connection
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
private $id;
|
||||
|
||||
/**
|
||||
* The stream connection
|
||||
*
|
||||
* @var AMQPStreamConnection|null
|
||||
*/
|
||||
private $connection;
|
||||
|
||||
/**
|
||||
* The channel to use for the connection
|
||||
*
|
||||
* @var AMQPChannel|null
|
||||
*/
|
||||
private $channel;
|
||||
|
||||
/**
|
||||
* The host to connect to
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
private $host;
|
||||
|
||||
/**
|
||||
* The port to connect to
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $port;
|
||||
|
||||
/**
|
||||
* (Optional) The username to use when connecting to the server
|
||||
*
|
||||
* @var string|null
|
||||
*/
|
||||
private $username;
|
||||
|
||||
/**
|
||||
* (Optional) The password to use when connecting to the server
|
||||
*
|
||||
* @var string|null
|
||||
*/
|
||||
private $password;
|
||||
|
||||
/**
|
||||
* The Unix timestamp of when the next reconnect should occur
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $next_reconnect;
|
||||
|
||||
/**
|
||||
* @param string $host
|
||||
* @param int $port
|
||||
* @param string|null $username
|
||||
* @param string|null $password
|
||||
*/
|
||||
public function __construct(string $host, int $port, ?string $username=null, ?string $password=null)
|
||||
{
|
||||
$this->id = uniqid();
|
||||
$this->host = $host;
|
||||
$this->port = $port;
|
||||
$this->username = $username;
|
||||
$this->password = $password;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string
|
||||
*/
|
||||
public function getId(): string
|
||||
{
|
||||
return $this->id;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return AMQPStreamConnection|null
|
||||
*/
|
||||
public function getConnection(): ?AMQPStreamConnection
|
||||
{
|
||||
return $this->connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return AMQPChannel|null
|
||||
*/
|
||||
public function getChannel(): ?AMQPChannel
|
||||
{
|
||||
return $this->channel;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns True if the client is connected to the server
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isConnected(): bool
|
||||
{
|
||||
return $this->connection !== null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Establishes a connection to the server
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function connect(): void
|
||||
{
|
||||
if($this->isConnected())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
$this->connection = new AMQPStreamConnection($this->host, $this->port, $this->username, $this->password);
|
||||
$this->channel = $this->connection->channel();
|
||||
$this->channel->queue_declare('tamer_queue', false, true, false, false);
|
||||
$this->next_reconnect = time() + 1800;
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
throw new ConnectionException(sprintf('Could not connect to RabbitMQ server: %s', $e->getMessage()), $e->getCode(), $e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the connection to the server
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function disconnect(): void
|
||||
{
|
||||
if(!$this->isConnected())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
$this->channel?->close();
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
$this->connection?->close();
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
|
||||
$this->channel = null;
|
||||
$this->connection = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnects to the server
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function reconnect(): void
|
||||
{
|
||||
$this->disconnect();
|
||||
$this->connect();
|
||||
}
|
||||
|
||||
/**
|
||||
* The automatic reconnect process
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function preformAutoreconf(): void
|
||||
{
|
||||
if($this->next_reconnect < time())
|
||||
{
|
||||
try
|
||||
{
|
||||
$this->reconnect();
|
||||
}
|
||||
catch (Exception $e)
|
||||
{
|
||||
Log::error('net.nosial.tamerlib', 'Could not reconnect to RabbitMQ server: %s', $e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
$this->next_reconnect = time() + 1800;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,399 +0,0 @@
|
|||
<?php
|
||||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace TamerLib\Protocols\RabbitMq;
|
||||
|
||||
use Exception;
|
||||
use LogLib\Log;
|
||||
use PhpAmqpLib\Message\AMQPMessage;
|
||||
use TamerLib\Abstracts\JobStatus;
|
||||
use TamerLib\Abstracts\ObjectType;
|
||||
use TamerLib\Classes\Validate;
|
||||
use TamerLib\Exceptions\ConnectionException;
|
||||
use TamerLib\Interfaces\WorkerProtocolInterface;
|
||||
use TamerLib\Objects\Job;
|
||||
use TamerLib\Objects\JobResults;
|
||||
|
||||
class Worker implements WorkerProtocolInterface
|
||||
{
|
||||
/**
|
||||
* An array of defined servers to use
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $defined_servers;
|
||||
|
||||
/**
|
||||
* @var false
|
||||
*/
|
||||
private $automatic_reconnect;
|
||||
|
||||
/**
|
||||
* An array of functions that the worker handles
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $functions;
|
||||
|
||||
/**
|
||||
* (Optional) The username to use when connecting to the server (if required)
|
||||
*
|
||||
* @var string|null
|
||||
*/
|
||||
private $username;
|
||||
|
||||
/**
|
||||
* (Optional) The password to use when connecting to the server
|
||||
*
|
||||
* @var string|null
|
||||
*/
|
||||
private $password;
|
||||
|
||||
/**
|
||||
* A array of active connections
|
||||
*
|
||||
* @var Connection[]
|
||||
*/
|
||||
private $connections;
|
||||
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
private $options;
|
||||
|
||||
/**
|
||||
* Public Constructor with optional username and password
|
||||
*
|
||||
* @param string|null $username
|
||||
* @param string|null $password
|
||||
*/
|
||||
public function __construct(?string $username = null, ?string $password = null)
|
||||
{
|
||||
$this->defined_servers = [];
|
||||
$this->connections = [];
|
||||
$this->functions = [];
|
||||
$this->automatic_reconnect = true;
|
||||
$this->username = $username;
|
||||
$this->password = $password;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a server to the list of servers to use
|
||||
*
|
||||
* @param string $host
|
||||
* @param int $port
|
||||
* @return void
|
||||
*/
|
||||
public function addServer(string $host, int $port): void
|
||||
{
|
||||
if(!isset($this->defined_servers[$host]))
|
||||
{
|
||||
$this->defined_servers[$host] = [];
|
||||
}
|
||||
|
||||
if(in_array($port, $this->defined_servers[$host]))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
$this->defined_servers[$host][] = $port;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds an array of servers to the list of servers to use
|
||||
*
|
||||
* @param array $servers (eg; [host:port, host:port, ...])
|
||||
* @return void
|
||||
*/
|
||||
public function addServers(array $servers): void
|
||||
{
|
||||
foreach($servers as $server)
|
||||
{
|
||||
$server = explode(':', $server);
|
||||
$this->addServer($server[0], (int)$server[1]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Establishes a connection to the server (or servers)
|
||||
*
|
||||
* @return void
|
||||
* @noinspection DuplicatedCode
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function connect(): void
|
||||
{
|
||||
if($this->isConnected())
|
||||
return;
|
||||
|
||||
if(count($this->defined_servers) === 0)
|
||||
return;
|
||||
|
||||
foreach($this->defined_servers as $host => $ports)
|
||||
{
|
||||
foreach($ports as $port)
|
||||
{
|
||||
$connection = new Connection($host, $port, $this->username, $this->password);
|
||||
$connection->connect();
|
||||
|
||||
$this->connections[] = $connection;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnects from the server
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function disconnect(): void
|
||||
{
|
||||
if(!$this->isConnected())
|
||||
return;
|
||||
|
||||
foreach($this->connections as $connection)
|
||||
{
|
||||
$connection->disconnect();
|
||||
}
|
||||
|
||||
$this->connections = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnects to the server (or servers)
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function reconnect(): void
|
||||
{
|
||||
$this->disconnect();
|
||||
$this->connect();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns True if one or more connections are connected, False otherwise
|
||||
* (Note, some connections may be disconnected, and this will still return True)
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isConnected(): bool
|
||||
{
|
||||
if(count($this->connections) === 0)
|
||||
return false;
|
||||
|
||||
foreach($this->connections as $connection)
|
||||
{
|
||||
if($connection->isConnected())
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the options to use for this client
|
||||
*
|
||||
* @param array $options
|
||||
* @return void
|
||||
*/
|
||||
public function setOptions(array $options): void
|
||||
{
|
||||
$this->options = $options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current options for this client
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function getOptions(): array
|
||||
{
|
||||
return $this->options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the current options for this client
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function clearOptions(): void
|
||||
{
|
||||
$this->options = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns True if automatic reconnection is enabled, False otherwise
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function automaticReconnectionEnabled(): bool
|
||||
{
|
||||
return $this->automatic_reconnect;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enables or disables automatic reconnection
|
||||
*
|
||||
* @param bool $enable
|
||||
* @return void
|
||||
*/
|
||||
public function enableAutomaticReconnection(bool $enable): void
|
||||
{
|
||||
$this->automatic_reconnect = $enable;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a new function to the worker to handle
|
||||
*
|
||||
* @param string $name
|
||||
* @param callable $callable
|
||||
* @param mixed|null $context
|
||||
* @return void
|
||||
*/
|
||||
public function addFunction(string $name, callable $callable, mixed $context = null): void
|
||||
{
|
||||
$this->functions[$name] = [
|
||||
'function' => $callable,
|
||||
'context' => $context
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes an existing function from the worker
|
||||
*
|
||||
* @param string $function_name
|
||||
* @return void
|
||||
*/
|
||||
public function removeFunction(string $function_name): void
|
||||
{
|
||||
unset($this->functions[$function_name]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes a job if there's one available
|
||||
*
|
||||
* @param bool $blocking
|
||||
* @param int $timeout
|
||||
* @param bool $throw_errors
|
||||
* @return void
|
||||
*/
|
||||
public function work(bool $blocking = true, int $timeout = 500, bool $throw_errors = false): void
|
||||
{
|
||||
if(!$this->isConnected())
|
||||
return;
|
||||
|
||||
// Select a random connection
|
||||
$connection = $this->connections[array_rand($this->connections)];
|
||||
|
||||
$callback = function($message) use ($throw_errors, $connection)
|
||||
{
|
||||
var_dump(Validate::getObjectType(msgpack_unpack($message->body)));
|
||||
if(Validate::getObjectType(msgpack_unpack($message->body)) !== ObjectType::Job)
|
||||
{
|
||||
$connection->getChannel()->basic_nack($message->delivery_info['delivery_tag']);
|
||||
return;
|
||||
}
|
||||
|
||||
$received_job = Job::fromArray(msgpack_unpack($message->body));
|
||||
|
||||
if($received_job->isClosure())
|
||||
{
|
||||
Log::debug('net.nosial.tamerlib', 'received closure: ' . $received_job->getId());
|
||||
|
||||
try
|
||||
{
|
||||
// TODO: Check back on this, looks weird.
|
||||
$closure = $received_job->getData();
|
||||
$result = $closure($received_job);
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
|
||||
// Do not requeue the job, it's a closure
|
||||
$connection->getChannel()->basic_nack($message->delivery_info['delivery_tag']);
|
||||
return;
|
||||
}
|
||||
|
||||
$job_results = new JobResults($received_job, JobStatus::Success, $result);
|
||||
$connection->getChannel->basic_publish(
|
||||
new AMQPMessage(msgpack_pack($job_results->toArray()), ['correlation_id' => $received_job->getId()])
|
||||
);
|
||||
$connection->getChannel()->basic_ack($message->delivery_info['delivery_tag']);
|
||||
return;
|
||||
}
|
||||
|
||||
if(!isset($this->functions[$received_job->getName()]))
|
||||
{
|
||||
Log::debug('net.nosial.tamerlib', 'received unknown function: ' . $received_job->getId());
|
||||
$connection->getChannel()->basic_nack($message->delivery_info['delivery_tag'], false, true);
|
||||
return;
|
||||
}
|
||||
|
||||
Log::debug('net.nosial.tamerlib', 'received function: ' . $received_job->getId());
|
||||
$function = $this->functions[$received_job->getName()];
|
||||
$callback = $function['function'];
|
||||
|
||||
try
|
||||
{
|
||||
$result = $callback($received_job->getData(), $function['context']);
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
|
||||
// Do not requeue the job, it's a closure
|
||||
$connection->getChannel()->basic_nack($message->delivery_info['delivery_tag']);
|
||||
return;
|
||||
}
|
||||
|
||||
$job_results = new JobResults($received_job, JobStatus::Success, $result);
|
||||
$connection->getChannel->basic_publish(
|
||||
new AMQPMessage(msgpack_pack($job_results->toArray()), ['correlation_id' => $received_job->getId()])
|
||||
);
|
||||
$connection->getChannel()->basic_ack($message->delivery_info['delivery_tag']);
|
||||
};
|
||||
|
||||
$connection->getChannel()->basic_consume('tamer_queue', '', false, false, false, false, $callback);
|
||||
|
||||
if ($blocking)
|
||||
{
|
||||
while(true)
|
||||
{
|
||||
$connection->getChannel()->wait();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
$start = microtime(true);
|
||||
while (true)
|
||||
{
|
||||
if (microtime(true) - $start >= $timeout / 1000)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
$connection->getChannel()->wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnects from the server when the object is destroyed
|
||||
*/
|
||||
public function __destruct()
|
||||
{
|
||||
try
|
||||
{
|
||||
$this->disconnect();
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -4,444 +4,8 @@
|
|||
|
||||
namespace TamerLib;
|
||||
|
||||
use Closure;
|
||||
use Exception;
|
||||
use InvalidArgumentException;
|
||||
use TamerLib\Abstracts\Mode;
|
||||
use TamerLib\Classes\Functions;
|
||||
use TamerLib\Classes\Supervisor;
|
||||
use TamerLib\Classes\Validate;
|
||||
use TamerLib\Exceptions\ConnectionException;
|
||||
use TamerLib\Exceptions\UnsupervisedWorkerException;
|
||||
use TamerLib\Interfaces\ClientProtocolInterface;
|
||||
use TamerLib\Interfaces\WorkerProtocolInterface;
|
||||
use TamerLib\Objects\Task;
|
||||
|
||||
class Tamer
|
||||
{
|
||||
/**
|
||||
* The protocol to use when connecting to the server
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
private static $protocol;
|
||||
|
||||
/**
|
||||
* The protocol to use when connecting to the server as a client
|
||||
*
|
||||
* @var ClientProtocolInterface|null
|
||||
*/
|
||||
private static $client;
|
||||
|
||||
/**
|
||||
* The protocol to use when connecting to the server as a worker
|
||||
*
|
||||
* @var WorkerProtocolInterface|null
|
||||
*/
|
||||
private static $worker;
|
||||
|
||||
/**
|
||||
* Indicates if Tamer is running as a client or worker
|
||||
*
|
||||
* @var string
|
||||
* @see Mode
|
||||
*/
|
||||
private static $mode;
|
||||
|
||||
/**
|
||||
* Indicates if Tamer is connected to the server
|
||||
*
|
||||
* @var bool
|
||||
*/
|
||||
private static $connected;
|
||||
|
||||
/**
|
||||
* The supervisor that is supervising the workers
|
||||
*
|
||||
* @var Supervisor
|
||||
*/
|
||||
private static $supervisor;
|
||||
|
||||
/**
|
||||
* Initializes Tamer as a client and connects to the server
|
||||
*
|
||||
* @param string $protocol
|
||||
* @param array $servers
|
||||
* @param string|null $username
|
||||
* @param string|null $password
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public static function init(string $protocol, array $servers, ?string $username=null, ?string $password=null): void
|
||||
{
|
||||
if(self::$connected)
|
||||
{
|
||||
throw new ConnectionException('Tamer is already connected to the server');
|
||||
}
|
||||
|
||||
if (!Validate::protocolType($protocol))
|
||||
{
|
||||
throw new InvalidArgumentException(sprintf('Invalid protocol type: %s', $protocol));
|
||||
}
|
||||
|
||||
self::$protocol = $protocol;
|
||||
self::$mode = Mode::Client;
|
||||
self::$client = Functions::createClient($protocol, $username, $password);
|
||||
self::$client->addServers($servers);
|
||||
self::$client->connect();
|
||||
self::$supervisor = new Supervisor($protocol, $servers, $username, $password);
|
||||
self::$connected = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes Tamer as a worker client and connects to the server
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
* @throws UnsupervisedWorkerException
|
||||
*/
|
||||
public static function initWorker(): void
|
||||
{
|
||||
if(self::$connected)
|
||||
{
|
||||
throw new ConnectionException('Tamer is already connected to the server');
|
||||
}
|
||||
|
||||
if(!Functions::getWorkerVariables()['TAMER_ENABLED'])
|
||||
{
|
||||
throw new UnsupervisedWorkerException('Tamer is not enabled for this worker');
|
||||
}
|
||||
|
||||
self::$protocol = Functions::getWorkerVariables()['TAMER_PROTOCOL'];
|
||||
self::$mode = Mode::Worker;
|
||||
self::$worker = Functions::createWorker(self::$protocol);
|
||||
self::$worker->addServers(Functions::getWorkerVariables()['TAMER_SERVERS']);
|
||||
self::$worker->connect();
|
||||
self::$connected = true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Disconnects from the server
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public static function disconnect(): void
|
||||
{
|
||||
if (!self::$connected)
|
||||
{
|
||||
throw new ConnectionException('Tamer is not connected to the server');
|
||||
}
|
||||
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$client->disconnect();
|
||||
}
|
||||
else
|
||||
{
|
||||
self::$worker->disconnect();
|
||||
}
|
||||
|
||||
self::$connected = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnects to the server
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public static function reconnect(): void
|
||||
{
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$client->reconnect();
|
||||
}
|
||||
else
|
||||
{
|
||||
self::$worker->reconnect();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a task to the queue to be executed by the worker
|
||||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
*/
|
||||
public static function do(Task $task): void
|
||||
{
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$client->do($task);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in client mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a closure operation in the background (does not return a result)
|
||||
*
|
||||
* @param Closure $closure The closure operation to perform (remote)
|
||||
* @return void
|
||||
*/
|
||||
public static function doClosure(Closure $closure): void
|
||||
{
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$client->doClosure($closure);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in client mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues a task to be processed in parallel (returns a result handled by a callback)
|
||||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
*/
|
||||
public static function queue(Task $task): void
|
||||
{
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$client->queue($task);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in client mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues a closure to be processed in parallel (returns a result handled by a callback)
|
||||
*
|
||||
* @param Closure $closure The closure operation to perform (remote)
|
||||
* @param Closure|null $callback The closure to call when the operation is complete (local)
|
||||
* @return void
|
||||
*/
|
||||
public static function queueClosure(Closure $closure, ?Closure $callback=null): void
|
||||
{
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$client->queueClosure($closure, $callback);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in client mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes all tasks in the queue and waits for them to complete
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public static function run(): bool
|
||||
{
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
return self::$client->run();
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in client mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a function to the worker
|
||||
*
|
||||
* @param string $name The name of the function to add
|
||||
* @param callable $callable The function to add
|
||||
* @return void
|
||||
*/
|
||||
public static function addFunction(string $name, callable $callable): void
|
||||
{
|
||||
if (self::$mode === Mode::Worker)
|
||||
{
|
||||
self::$worker->addFunction($name, $callable);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in worker mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a function from the worker
|
||||
*
|
||||
* @param string $function_name The name of the function to remove
|
||||
* @return void
|
||||
*/
|
||||
public static function removeFunction(string $function_name): void
|
||||
{
|
||||
if (self::$mode === Mode::Worker)
|
||||
{
|
||||
self::$worker->removeFunction($function_name);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in worker mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Works a job from the queue (blocking or non-blocking)
|
||||
*
|
||||
* @param bool $blocking (optional) Whether to block until a job is available
|
||||
* @param int $timeout (optional) The timeout to use when blocking
|
||||
* @param bool $throw_errors (optional) Whether to throw errors or not
|
||||
* @return void
|
||||
*/
|
||||
public static function work(bool $blocking=true, int $timeout=500, bool $throw_errors=false): void
|
||||
{
|
||||
if (self::$mode === Mode::Worker)
|
||||
{
|
||||
self::$worker->work($blocking, $timeout, $throw_errors);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in worker mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Monitors the workers and restarts them if they die unexpectedly (monitor mode only)
|
||||
*
|
||||
* @param bool $blocking
|
||||
* @param bool $auto_restart
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public static function monitor(bool $blocking=false, bool $auto_restart=true): void
|
||||
{
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$supervisor->monitor($blocking, $auto_restart);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in client mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a worker to the supervisor
|
||||
*
|
||||
* @param string $target
|
||||
* @param int $instances
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public static function addWorker(string $target, int $instances): void
|
||||
{
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$supervisor->addWorker($target, $instances);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in client mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts all workers
|
||||
*
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public static function startWorkers(): void
|
||||
{
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$supervisor->start();
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in client mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops all workers
|
||||
*
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public static function stopWorkers(): void
|
||||
{
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$supervisor->stop();
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in client mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Restarts all workers
|
||||
*
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public static function restartWorkers(): void
|
||||
{
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$supervisor->restart();
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in client mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string
|
||||
*/
|
||||
public static function getProtocol(): string
|
||||
{
|
||||
return self::$protocol;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return ClientProtocolInterface|null
|
||||
*/
|
||||
public static function getClient(): ?ClientProtocolInterface
|
||||
{
|
||||
return self::$client;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return WorkerProtocolInterface|null
|
||||
*/
|
||||
public static function getWorker(): ?WorkerProtocolInterface
|
||||
{
|
||||
return self::$worker;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string
|
||||
*/
|
||||
public static function getMode(): string
|
||||
{
|
||||
return self::$mode;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return bool
|
||||
*/
|
||||
public static function isConnected(): bool
|
||||
{
|
||||
return self::$connected;
|
||||
}
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
<?php
|
||||
|
||||
|
||||
// Pi function (closure) loop 10 times
|
||||
for ($i = 0; $i < 50; $i++)
|
||||
{
|
||||
$start = microtime(true);
|
||||
$pi = 0;
|
||||
$top = 4;
|
||||
$bot = 1;
|
||||
$minus = true;
|
||||
$iterations = 1000000;
|
||||
|
||||
for ($i = 0; $i < $iterations; $i++)
|
||||
{
|
||||
if ($minus)
|
||||
{
|
||||
$pi = $pi - ($top / $bot);
|
||||
$minus = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
$pi = $pi + ($top / $bot);
|
||||
$minus = true;
|
||||
}
|
||||
|
||||
$bot += 2;
|
||||
}
|
||||
|
||||
}
|
18
tests/redis_server.php
Normal file
18
tests/redis_server.php
Normal file
|
@ -0,0 +1,18 @@
|
|||
<?php
|
||||
|
||||
require 'ncc';
|
||||
import('net.nosial.tamerlib');
|
||||
|
||||
$redis_server = new \TamerLib\Classes\RedisServer();
|
||||
$redis_server->start();
|
||||
|
||||
$redis_client = new \Redis();
|
||||
$redis_client->connect('127.0.0.1', $redis_server->getPort());
|
||||
|
||||
$redis_client->set('foo', 'bar');
|
||||
$value = $redis_client->get('foo');
|
||||
|
||||
echo $value . PHP_EOL;
|
||||
|
||||
$redis_client->close();
|
||||
$redis_server->stop();
|
|
@ -1,71 +0,0 @@
|
|||
<?php
|
||||
|
||||
|
||||
use TamerLib\Abstracts\ProtocolType;
|
||||
use TamerLib\Tamer;
|
||||
|
||||
require 'ncc';
|
||||
|
||||
import('net.nosial.tamerlib', 'latest');
|
||||
|
||||
Tamer::init(ProtocolType::Gearman,
|
||||
['127.0.0.1:4730']
|
||||
//['127.0.0.1:5672'], 'guest', 'guest'
|
||||
);
|
||||
|
||||
$instances = 10;
|
||||
Tamer::addWorker('closure', $instances);
|
||||
Tamer::startWorkers();
|
||||
$a = microtime(true);
|
||||
$times = [];
|
||||
$jobs = 30;
|
||||
|
||||
// Pi function (closure) loop 10 times
|
||||
for ($i = 0; $i < $jobs; $i++)
|
||||
{
|
||||
Tamer::queueClosure(function(){
|
||||
// Full pi calculation implementation
|
||||
|
||||
$start = microtime(true);
|
||||
$pi = 0;
|
||||
$top = 4;
|
||||
$bot = 1;
|
||||
$minus = true;
|
||||
$iterations = 1000000;
|
||||
|
||||
for ($i = 0; $i < $iterations; $i++)
|
||||
{
|
||||
if ($minus)
|
||||
{
|
||||
$pi = $pi - ($top / $bot);
|
||||
$minus = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
$pi = $pi + ($top / $bot);
|
||||
$minus = true;
|
||||
}
|
||||
|
||||
$bot += 2;
|
||||
}
|
||||
|
||||
return json_encode([$pi, $start]);
|
||||
},
|
||||
function($return) use ($a, &$times)
|
||||
{
|
||||
$return = json_decode($return, true);
|
||||
$end_time = microtime(true) - $return[1];
|
||||
$times[] = $end_time;
|
||||
echo "Pi is {$return[0]}, completed in " . ($end_time) . " seconds \n";
|
||||
});
|
||||
}
|
||||
|
||||
echo "Waiting for $jobs jobs to finish on $instances workers \n";
|
||||
Tamer::run();
|
||||
$b = microtime(true);
|
||||
|
||||
echo PHP_EOL;
|
||||
echo "Average time: " . (array_sum($times) / count($times)) . " seconds \n";
|
||||
echo "Took (with tamer)" . ($b - $a) . " seconds \n";
|
||||
echo "Total time (without tamer): " . (array_sum($times)) . " seconds \n";
|
||||
echo "Tamer overhead: " . (($b - $a) - array_sum($times)) . " seconds \n";
|
|
@ -1,33 +0,0 @@
|
|||
<?php
|
||||
|
||||
use TamerLib\Abstracts\ProtocolType;
|
||||
use TamerLib\Objects\JobResults;
|
||||
use TamerLib\Objects\Task;
|
||||
use TamerLib\Tamer;
|
||||
|
||||
require 'ncc';
|
||||
|
||||
import('net.nosial.tamerlib', 'latest');
|
||||
|
||||
Tamer::init(ProtocolType::Gearman,
|
||||
['127.0.0.1:4730']
|
||||
);
|
||||
|
||||
Tamer::addWorker(__DIR__ . '/tamer_worker.php', 10);
|
||||
Tamer::startWorkers();
|
||||
|
||||
|
||||
// Sleep function (task) loop 10 times
|
||||
for ($i = 0; $i < 10; $i++)
|
||||
{
|
||||
Tamer::queue(Task::create('sleep', 5, function(JobResults $data)
|
||||
{
|
||||
echo "Slept for {$data->getData()} seconds \n";
|
||||
}));
|
||||
}
|
||||
|
||||
echo "Waiting for jobs to finish \n";
|
||||
$a = microtime(true);
|
||||
Tamer::run();
|
||||
$b = microtime(true);
|
||||
echo "Took " . ($b - $a) . " seconds \n";
|
|
@ -1,18 +0,0 @@
|
|||
<?php
|
||||
|
||||
use TamerLib\Abstracts\Mode;
|
||||
use TamerLib\Abstracts\ProtocolType;
|
||||
use TamerLib\Tamer;
|
||||
|
||||
require 'ncc';
|
||||
|
||||
import('net.nosial.tamerlib', 'latest');
|
||||
|
||||
Tamer::initWorker();
|
||||
|
||||
Tamer::addFunction('sleep', function(\TamerLib\Objects\Job $job) {
|
||||
sleep($job->getData());
|
||||
return $job->getData();
|
||||
});
|
||||
|
||||
Tamer::work();
|
Loading…
Add table
Reference in a new issue