Lots of changes, implemented the base usage. Beware of bugs and unfinished states

This commit is contained in:
Netkas 2023-06-12 21:32:35 -04:00
parent 0c23fdfac2
commit f20551857b
No known key found for this signature in database
GPG key ID: 5DAF58535614062B
27 changed files with 3045 additions and 71 deletions

View file

@ -0,0 +1,77 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib\Classes;
class AdaptiveSleep
{
/**
* @var int int
*/
private $max_sleep_time;
/**
* @var array
*/
private $busy_buffer;
/**
* @var int
*/
private $buffer_size;
/**
* @var int
*/
private $buffer_index;
/**
* AdaptiveSleep constructor.
*
* @param int $max_sleep_time
* @param int $buffer_size
*/
public function __construct(int $max_sleep_time=500, int $buffer_size=30)
{
$this->max_sleep_time = $max_sleep_time;
$this->busy_buffer = array_fill(0, $buffer_size, false); // Fill the buffer with false values (not busy
$this->buffer_size = $buffer_size;
$this->buffer_index = 0;
}
/**
* Preforms an adaptive sleep.
*
* @param bool $busy
* @return int
*/
public function sleep(bool $busy): int
{
// Add the busy state to the buffer
$this->busy_buffer[$this->buffer_index] = $busy;
$this->buffer_index = ($this->buffer_index + 1) % $this->buffer_size; // Circular buffer
// Calculate the average busy state
$busy_count = 0;
foreach($this->busy_buffer as $busy_state)
{
if($busy_state)
{
$busy_count++;
}
}
$busy_average = $busy_count / $this->buffer_size;
// Calculate the sleep time
$sleep_time = $this->max_sleep_time * (1 - $busy_average);
// Sleep
if($sleep_time > 0)
{
usleep($sleep_time);
}
return $sleep_time;
}
}

View file

@ -0,0 +1,628 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib\Classes;
use Exception;
use LogLib\Log;
use Redis;
use TamerLib\Enums\JobStatus;
use TamerLib\Exceptions\JobNotFoundException;
use TamerLib\Exceptions\ConnectionException;
use TamerLib\Exceptions\ServerException;
use TamerLib\Exceptions\TimeoutException;
use TamerLib\Objects\JobPacket;
use TamerLib\Objects\ServerConfiguration;
use TamerLib\Objects\WorkerConfiguration;
use Throwable;
class JobManager
{
/**
* @var ServerConfiguration
*/
private $server_configuration;
/**
* @var Redis
*/
private $redis_client;
/**
* @var int|null
*/
private $last_connect;
/**
* JobManager constructor.
*
* @param ServerConfiguration $serverConfiguration
*/
public function __construct(ServerConfiguration $serverConfiguration)
{
$this->server_configuration = $serverConfiguration;
$this->redis_client = new Redis();
}
/**
* Attempts to determine if the Redis Server is online.
*
* @param bool $ping
* @return bool
*/
private function isConnected(bool $ping=true): bool
{
if($this->redis_client === null)
{
return false;
}
try
{
if($ping)
{
$this->redis_client->ping();
}
}
catch(Exception $e)
{
unset($e);
return false;
}
return true;
}
/**
* Attempts to connect to the Redis Server
*
* @return void
* @throws ConnectionException
*/
private function connect(): void
{
// Reconnect every 30 minutes
if ($this->last_connect !== null && $this->last_connect < (time() - 1800))
{
//$this->disconnect();
}
if($this->isConnected())
{
return;
}
try
{
$this->redis_client->connect(
$this->server_configuration->getHost(),
$this->server_configuration->getPort()
);
if ($this->server_configuration->getPassword() !== null)
{
$this->redis_client->auth($this->server_configuration->getPassword());
}
$this->redis_client->select($this->server_configuration->getDatabase());
$this->last_connect = time();
}
catch(Exception $e)
{
throw new ConnectionException(sprintf('Could not connect to %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e);
}
}
/**
* Disconnects from the Redis Server
*
* @return void
*/
private function disconnect(): void
{
try
{
$this->redis_client->close();
}
catch(Exception $e)
{
Log::warning('net.nosial.tamerlib',sprintf('JobManager could not disconnect safely from %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e);
}
finally
{
$this->redis_client = null;
}
}
/**
* Returns the Redis Client, or attempts to connect to the Redis Server if not connected.
*
* @return Redis
* @throws ConnectionException
*/
private function getClient(): Redis
{
$this->connect();
return $this->redis_client;
}
/**
* Pushes a JobPacket to the Redis server.
*
* @param JobPacket $jobPacket
* @return void
* @throws ServerException
*/
public function pushJob(JobPacket $jobPacket): void
{
try
{
$this->getClient()->hMSet($jobPacket->getId(), $jobPacket->toArray());
$this->getClient()->rPush(sprintf('ch%s', $jobPacket->getChannel()), $jobPacket->getId());
}
catch(Exception $e)
{
throw new ServerException(sprintf('Could not push job to %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e);
}
}
/**
* Attempts to get a specific job from the Redis Server.
*
* @param JobPacket|string $job_id
* @return JobPacket
* @throws JobNotFoundException
* @throws ServerException
*/
public function getJob(JobPacket|string $job_id): JobPacket
{
if($job_id instanceof JobPacket)
{
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$job_id = $job_id->getId();
}
try
{
if(!$this->getClient()->exists($job_id))
{
throw new JobNotFoundException(sprintf('Job %s does not exist in %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
}
return new JobPacket($this->getClient()->hGetAll($job_id));
}
catch(JobNotFoundException $e)
{
throw $e;
}
catch(Exception $e)
{
throw new ServerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e);
}
}
/**
* Returns the status of a job.
*
* @param JobPacket|string $job_id
* @return int
* @throws ServerException
*/
public function getJobStatus(JobPacket|string $job_id): int
{
if($job_id instanceof JobPacket)
{
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$job_id = $job_id->getId();
}
try
{
if(!$this->getClient()->exists($job_id))
{
throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
}
return (int)$this->getClient()->hGet($job_id, 'status');
}
catch(Exception $e)
{
throw new ServerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e);
}
}
/**
* Listens on a return channel for a returned job.
*
* @param string $return_channel
* @param int $timeout
* @return JobPacket
* @throws JobNotFoundException
* @throws ServerException
* @throws TimeoutException
*/
public function listenReturnChannel(string $return_channel, int $timeout=0): JobPacket
{
$time_start = time();
try
{
if($timeout < 0)
{
Log::debug('net.nosial.tamerlib', sprintf('Listening for job on return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
$job_packet = $this->getClient()->lPop($return_channel);
}
else
{
Log::debug('net.nosial.tamerlib', sprintf('Listening for job on return channel %s on %s:%s with timeout %s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $timeout));
$job_packet = $this->getClient()->blPop($return_channel, $timeout);
}
}
catch(Exception $e)
{
throw new TimeoutException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e);
}
if($job_packet === null)
{
throw new TimeoutException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
}
if($job_packet[1] === 'test')
{
return $this->listenReturnChannel($return_channel, $timeout);
}
Log::debug('net.nosial.tamerlib', sprintf('Received job %s from return channel %ss', $job_packet[1], $return_channel));
return $this->getJob($job_packet[1]);
}
/**
* Gets the return value of a job.
*
* @param JobPacket|string $job_id
* @return mixed
* @throws JobNotFoundException
* @throws ServerException
*/
public function getJobResult(JobPacket|string $job_id): mixed
{
if($job_id instanceof JobPacket)
{
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$job_id = $job_id->getId();
}
try
{
if(!$this->getClient()->exists($job_id))
{
throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
}
return $this->getClient()->hGet($job_id, 'return');
}
catch(JobNotFoundException $e)
{
throw $e;
}
catch(Exception $e)
{
throw new ServerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e);
}
}
/**
* Returns the exception to a job.
*
* @param JobPacket|string $job_id
* @return Throwable
* @throws ServerException
*/
public function getJobException(JobPacket|string $job_id): Throwable
{
if($job_id instanceof JobPacket)
{
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$job_id = $job_id->getId();
}
try
{
if(!$this->getClient()->exists($job_id))
{
throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
}
return unserialize($this->getClient()->hGet($job_id, 'exception'), ['allowed_classes'=>true]);
}
catch(Exception $e)
{
throw new ServerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e);
}
}
/**
* Attempts to claim an existing JobPacket, returns True if the worker's claim was successful.
* Returns False if the JobPacket does not exist, or if the JobPacket is already claimed.
*
* @param JobPacket|string $job_id
* @param WorkerConfiguration|string $worker_id
* @return bool
* @throws ServerException
*/
public function claimJob(JobPacket|string $job_id, WorkerConfiguration|string $worker_id): bool
{
if($job_id instanceof JobPacket)
{
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$job_id = $job_id->getId();
}
if($worker_id instanceof WorkerConfiguration)
{
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$worker_id = $worker_id->getWorkerId();
}
try
{
// Check if the job exists
if(!$this->getClient()->exists($job_id))
{
return false;
}
// Attempt to claim the job
$this->getClient()->hSet($job_id, 'worker_id', $worker_id);
// Verify that the job was claimed
if($this->getClient()->hGet($job_id, 'worker_id') !== $worker_id)
{
return false;
}
// Set the job status to processing
$this->getClient()->hSet($job_id, 'status', JobStatus::PROCESSING);
}
catch(Exception $e)
{
throw new ServerException(sprintf('Could not claim job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e);
}
return true;
}
/**
* Marks a job as finished, and sets the return value.
*
* @param JobPacket|string $job_id
* @param mixed|null $return_value
* @return void
* @throws JobNotFoundException
* @throws ServerException
*/
public function returnJob(JobPacket|string $job_id, mixed $return_value=null): void
{
if($job_id instanceof JobPacket)
{
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$job_id = $job_id->getId();
}
try
{
if($this->getClient()->exists($job_id))
{
$return_channel = $this->getClient()->hGet($job_id, 'return_channel');
Log::debug('net.nosial.tamerlib', sprintf('Returning job %s (Return Channel: %s)', $job_id, $return_channel ?? 'n/a'));
if($return_channel === null)
{
$this->getClient()->del($job_id);
return;
}
$this->getClient()->hSet($job_id, 'return_value', serialize($return_value));
$this->getClient()->hSet($job_id, 'status', JobStatus::FINISHED);
$this->getClient()->rPush($this->getClient()->hGet($job_id, 'return_channel'), $job_id);
return;
}
}
catch(Exception $e)
{
throw new ServerException(sprintf('Could not return job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e);
}
throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
}
/**
* Rejects a job and pushes it back to the channel.
*
* @param JobPacket|string $job_id
* @return void
* @throws JobNotFoundException
* @throws ServerException
*/
public function rejectJob(JobPacket|string $job_id): void
{
$channel_id = null;
if($job_id instanceof JobPacket)
{
// Providing a JobPacket allows us to get the channel_id and avoid an extra call to the server
$channel_id = $job_id->getChannel();
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$job_id = $job_id->getId();
}
try
{
if($this->getClient()->exists($job_id))
{
Log::debug('net.nosial.tamerlib', sprintf('Rejecting job %s', $job_id));
// Mark as rejected, clear worker_id
$this->getClient()->hSet($job_id, 'worker_id', null);
// Push back to the channel
if($channel_id !== null)
{
$channel_id = $this->getClient()->hGet($job_id, 'channel');
}
$this->getClient()->rPush(sprintf('ch%s', $channel_id), $job_id);
return;
}
}
catch(Exception $e)
{
throw new ServerException(sprintf('Could not reject job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e);
}
throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
}
/**
* Sets the job as failed, and sets the exception that was thrown.
*
* @param JobPacket|string $job_id
* @param Throwable $throwable
* @return void
* @throws ServerException
*/
public function returnException(JobPacket|string $job_id, Throwable $throwable): void
{
if($job_id instanceof JobPacket)
{
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$job_id = $job_id->getId();
}
try
{
if($this->getClient()->exists($job_id))
{
Log::debug('net.nosial.tamerlib', sprintf('Returning exception for job %s', $job_id));
if($this->getClient()->hGet($job_id, 'return_channel') === null)
{
$this->getClient()->del($job_id);
return;
}
$this->getClient()->hSet($job_id, 'exception', serialize($throwable));
$this->getClient()->hSet($job_id, 'status', JobStatus::FAILED);
$this->getClient()->rPush($this->getClient()->hGet($job_id, 'return_channel'), $job_id);
Log::debug('net.nosial.tamerlib', sprintf('Pushed job %s to return channel %s', $job_id, $this->getClient()->hGet($job_id, 'return_channel')));
return;
}
}
catch(Exception $e)
{
throw new ServerException(sprintf('Could not return exception for job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e);
}
}
/**
* Waits for a job to be available on the channel or multiple channels, attempts to claim it, and returns it.
*
* If multiple channels are selected, the function will iterate through each channel until a job is found.
*
* @param WorkerConfiguration|string $worker_id The worker that is waiting for the job.
* @param int|array $channel The channel (or channels) to wait for the job on.
* @param int $timeout The timeout in seconds to wait for the job.
* @return JobPacket The job that was claimed, or null if no job was available.
* @throws ServerException
* @throws TimeoutException
*/
public function listenForJob(WorkerConfiguration|string $worker_id, int|array $channel=0, int $timeout=0): JobPacket
{
if($worker_id instanceof WorkerConfiguration)
{
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$worker_id = $worker_id->getWorkerId();
}
$channels = [];
if(is_array($channel))
{
foreach($channel as $id)
{
$channels[] = sprintf('ch%s', $id);
}
}
else
{
$channels[] = sprintf('ch%s', $channel);
}
try
{
while(true)
{
if($timeout < 0)
{
$job_id = $this->getClient()->blPop($channels, 1);
}
else
{
$job_id = $this->getClient()->blPop($channels, $timeout);
}
if($job_id !== false && $this->claimJob($job_id[1], $worker_id))
{
return new JobPacket($this->getClient()->hGetAll($job_id[1]));
}
if($timeout < 0)
{
throw new TimeoutException(sprintf('Timeout exceeded while waiting for job on %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()));
}
if($timeout === 0 && $job_id === null)
{
throw new TimeoutException(sprintf('Timeout exceeded while waiting for job on %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()));
}
}
}
catch(Exception $e)
{
throw new ServerException(sprintf('Could not wait for job on channel %s from %s:%s', $channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e);
}
}
/**
* Drops a job from the server entirely.
*
* @param JobPacket|string $job_id
* @return void
* @throws JobNotFoundException
* @throws ServerException
*/
public function dropJob(JobPacket|string $job_id): void
{
if($job_id instanceof JobPacket)
{
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$job_id = $job_id->getId();
}
try
{
if($this->getClient()->exists($job_id))
{
$this->getClient()->del($job_id);
return;
}
}
catch(Exception $e)
{
throw new ServerException(sprintf('Could not drop job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e);
}
throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
}
}

View file

@ -8,8 +8,8 @@
use Redis;
use RedisException;
use Symfony\Component\Process\Process;
use TamerLib\Exceptions\NoAvailablePortException;
use TamerLib\Exceptions\RedisServerException;
use TamerLib\Exceptions\ServerException;
use TamerLib\Objects\ServerConfiguration;
class RedisServer
{
@ -19,14 +19,11 @@
private $cmd;
/**
* @var string
* The server configuration
*
* @var ServerConfiguration
*/
private $host;
/**
* @var int|null
*/
private $port;
private $configuration;
/**
* @var Process|null
@ -36,33 +33,15 @@
/**
* RedisServer constructor.
*
* @param ServerConfiguration $configuration
* @param string $cmd
* @param string $host
* @param int|null $port
* @throws NoAvailablePortException
*/
public function __construct(string $cmd='redis-server', string $host='127.0.0.1', ?int $port=null)
public function __construct(ServerConfiguration $configuration, string $cmd='redis-server')
{
if (is_null($port))
{
$port = Utilities::getAvailablePort();
Log::debug('net.nosial.tamerlib', 'Selected port ' . $port . '.');
}
$this->configuration = $configuration;
$this->cmd = $cmd;
$this->host = $host;
$this->port = $port;
}
/**
* Returns the port that the Redis server is listening on.
*
* @return int|null
*/
public function getPort(): ?int
{
return $this->port;
}
/**
* Determines if the Redis server is running.
@ -84,8 +63,7 @@
*
* @param int $timeout
* @return bool
* @throws RedisServerException
* @throws RedisException
* @throws ServerException
*/
public function start(int $timeout=60): bool
{
@ -94,28 +72,34 @@
return true;
}
Log::verbose('net.nosial.tamerlib', 'Starting Redis server on port ' . $this->port . '.');
$this->server_process = new Process([$this->cmd, '--port', $this->port]);
Log::verbose('net.nosial.tamerlib', 'Starting server on port ' . $this->configuration->getPort() . '.');
$this->server_process = new Process([$this->cmd, '--port', $this->configuration->getPort()]);
$this->server_process->start();
// Use a redis client and ping the server until it responds.
$redis_client = new Redis();
$timeout_counter = 0;
while(!$redis_client->isConnected())
while(true)
{
if($timeout_counter >= $timeout)
{
throw new RedisServerException('Redis server failed to start within ' . $timeout . ' seconds.');
throw new ServerException('Redis server failed to start within ' . $timeout . ' seconds.');
}
try
{
$redis_client->connect($this->host, $this->port);
if($redis_client->isConnected())
{
break;
}
$redis_client->connect($this->configuration->getHost(), $this->configuration->getPort());
}
catch (RedisException $e)
{
// Do nothing.
unset($e);
}
finally
{
@ -124,7 +108,7 @@
}
}
Log::verbose('net.nosial.tamerlib', 'Redis server started.');
Log::verbose('net.nosial.tamerlib', sprintf('Server listening on %s:%s.', $this->configuration->getHost(), $this->configuration->getPort()));
return true;
}
@ -141,7 +125,7 @@
}
$this->server_process->stop();
Log::verbose('net.nosial.tamerlib', 'Redis server stopped.');
Log::verbose('net.nosial.tamerlib', sprintf('Server stopped on %s:%s.', $this->configuration->getHost(), $this->configuration->getPort()));
return true;
}

View file

@ -2,7 +2,13 @@
namespace TamerLib\Classes;
use Exception;
use Opis\Closure\SerializableClosure;
use TamerLib\Exceptions\NoAvailablePortException;
use Throwable;
use function serialize;
use function unserialize;
use const PHP_MAJOR_VERSION;
class Utilities
{
@ -35,4 +41,78 @@
throw new NoAvailablePortException('No available port found in range ' . $start . ' to ' . $end . '.');
}
/**
* Returns a randomly generated string of the given length.
*
* @param int $length
* @return string
* @throws Exception
*/
public static function generateRandomString(int $length=8): string
{
$characters = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ';
$characters_length = strlen($characters);
$random_string = '';
for ($i = 0; $i < $length; $i++)
{
$random_string .= $characters[random_int(0, $characters_length - 1)];
}
return $random_string;
}
/**
* Returns an array representation of a throwable exception.
*
* @param Throwable $throwable
* @param bool $recursive
* @return array
*/
public static function throwableToArray(Throwable $throwable, bool $recursive=true): array
{
$array = [
'message' => $throwable->getMessage(),
'code' => $throwable->getCode(),
'file' => $throwable->getFile(),
'line' => $throwable->getLine(),
'trace' => $throwable->getTrace(),
];
if($recursive && $throwable->getPrevious() instanceof Throwable)
{
$array['previous'] = self::throwableToArray($throwable->getPrevious());
}
return $array;
}
/**
* Serialize
*
* @param mixed $data
* @return string
*/
public static function serialize($data)
{
SerializableClosure::enterContext();
SerializableClosure::wrapClosures($data);
$data = serialize($data);
SerializableClosure::exitContext();
return $data;
}
/**
* Unserialize
*
* @param string $data
* @param array|null $options
* @return mixed
*/
public static function unserialize($data, array $options = null)
{
SerializableClosure::enterContext();
$data = $options === null || PHP_MAJOR_VERSION < 7 ? unserialize($data) : unserialize($data, $options);
SerializableClosure::unwrapClosures($data);
SerializableClosure::exitContext();
return $data;
}
}

View file

@ -0,0 +1,177 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib\Classes;
use InvalidArgumentException;
use LogLib\Log;
use RuntimeException;
use TamerLib\Objects\ServerConfiguration;
use TamerLib\Objects\WorkerConfiguration;
use TamerLib\Objects\WorkerInstance;
class WorkerSupervisor
{
/**
* @var WorkerInstance[]
*/
private $workers;
/**
* @var ServerConfiguration
*/
private $configuration;
/**
* WorkerSupervisor constructor.
*/
public function __construct(ServerConfiguration $configuration)
{
$this->workers = [];
$this->configuration = $configuration;
}
/**
* Generates a worker configuration object.
*
* @param int $channel
* @return WorkerConfiguration
*/
private function generateWorkerConfiguration(int $channel=0): WorkerConfiguration
{
$configuration = new WorkerConfiguration();
$configuration->setHost($this->configuration->getHost());
$configuration->setPort($this->configuration->getPort());
$configuration->setPassword($this->configuration->getPassword());
$configuration->setDatabase($this->configuration->getDatabase());
$configuration->setChannel($channel);
// TODO: Pass on database and password
return $configuration;
}
/**
* @param string $path
* @param int $count
* @param int $channel
* @return void
*/
public function spawnWorker(string $path, int $count=8, int $channel=0, bool $check=true): void
{
if(!file_exists($path) || !is_file($path))
{
throw new InvalidArgumentException(sprintf('Path %s does not exist', $path));
}
for($i = 0; $i < $count; $i++)
{
$worker_config = $this->generateWorkerConfiguration($channel);
Log::debug('net.nosial.tamerlib', sprintf('Spawning worker %s (%s)', $worker_config->getWorkerId(), $path));
$worker = new WorkerInstance($worker_config, $path);
$this->workers[$worker_config->getWorkerId()] = $worker;
$worker->start();
}
if($check)
{
// TODO: Check if workers are running
}
$this->printUpdates();
}
/**
* Spawns a closure as a worker process. (built-in worker)
*
* @param int $count
* @param int $channel
* @return void
*/
public function spawnClosure(int $count=8, int $channel=0): void
{
if(!file_exists(__DIR__ . DIRECTORY_SEPARATOR . 'subproc'))
{
throw new RuntimeException(sprintf('subproc file does not exist, checked %s', __DIR__));
}
$this->spawnWorker(__DIR__ . DIRECTORY_SEPARATOR . 'subproc', $count, $channel);
}
/**
* @return void
*/
public function printUpdates(): void
{
/** @var WorkerInstance $worker */
foreach($this->workers as $worker)
{
print($worker->getOutput());
}
}
/**
* Monitors the worker processes and restarts them if they are not running.
*
* @param int $timeout
* @return void
*/
public function monitor(int $timeout=0): void
{
$start_time = time();
while(true)
{
/** @var WorkerInstance $worker */
foreach($this->workers as $worker)
{
print($worker->getOutput());
//if(!$worker->isRunning())
//{
// print($worker->getOutput());
// Log::warning('net.nosial.tamerlib', sprintf('Worker %s is not running, killing', $worker->getConfiguration()->getWorkerId()));
// $worker->stop();
// unset($this->workers[$worker->getConfiguration()->getWorkerId()]);
// $this->spawnWorker($worker->getPath(), 1, $worker->getConfiguration()->getChannel(), false);
//}
}
if($timeout < 0)
{
return;
}
if($timeout > 0 && time() - $start_time > $timeout)
{
return;
}
}
}
/**
* Stops all worker processes and removes them from the supervisor.
*
* @return void
*/
public function stopAll(): void
{
Log::debug('net.nosial.tamerlib', 'Stopping all workers');
foreach($this->workers as $worker_id => $worker)
{
$worker->stop();
unset($this->workers[$worker_id]);
}
}
/**
* Public Destructor
*/
public function __destruct()
{
$this->stopAll();
}
}

View file

@ -0,0 +1,28 @@
<?PHP
require 'ncc';
import('net.nosial.tamerlib');
\TamerLib\tm::initalize(\TamerLib\Enums\TamerMode::WORKER);
$function = function($input)
{
var_dump($input);
sleep($input);
return 5 + $input;
};
\TamerLib\tm::addFunction('test', $function);
while(true)
{
try
{
\TamerLib\tm::run();
}
catch(Exception $e)
{
var_dump($e);
//\LogLib\Log::error('net.nosial.tamerlib', $e->getMessage(), $e);
}
}

View file

@ -0,0 +1,11 @@
<?php
namespace TamerLib\Enums;
final class EncodingType
{
public const CLOSURE_POINTER = 0;
public const SERIALIZED = 100;
}

View file

@ -0,0 +1,23 @@
<?php
namespace TamerLib\Enums;
final class JobStatus
{
public const WAITING = 10;
public const PROCESSING = 20;
public const FINISHED = 30;
public const FAILED = 40;
public const REJECTED = 50;
public const PROCESSING_STATES = [
self::WAITING,
self::PROCESSING,
self::REJECTED // This is a special case, as the job gets pushed back into the queue for reprocessing.
];
public const FINISHED_STATES = [
self::FINISHED,
self::FAILED,
];
}

View file

@ -0,0 +1,10 @@
<?php
namespace TamerLib\Enums;
final class JobType
{
public const CLOSURE = 100;
public const FUNCTION = 200;
}

View file

@ -0,0 +1,15 @@
<?php
namespace TamerLib\Enums;
final class TamerMode
{
public const CLIENT = 'client';
public const WORKER = 'worker';
public const ALL = [
self::CLIENT,
self::WORKER
];
}

View file

@ -0,0 +1,9 @@
<?php
namespace TamerLib\Enums;
final class WorkerType
{
public const SCRIPT = 10;
public const CLOSURE = 20;
}

View file

@ -0,0 +1,19 @@
<?php
namespace TamerLib\Exceptions;
use Exception;
use Throwable;
class ConnectionException extends Exception
{
/**
* @param string $message
* @param int $code
* @param Throwable|null $previous
*/
public function __construct(string $message = "", int $code = 0, ?Throwable $previous = null)
{
parent::__construct($message, $code, $previous);
}
}

View file

@ -0,0 +1,19 @@
<?php
namespace TamerLib\Exceptions;
use Exception;
use Throwable;
class JobNotFoundException extends Exception
{
/**
* @param string $message
* @param int $code
* @param Throwable|null $previous
*/
public function __construct(string $message = "", int $code = 0, ?Throwable $previous = null)
{
parent::__construct($message, $code, $previous);
}
}

View file

@ -2,9 +2,10 @@
namespace TamerLib\Exceptions;
use Exception;
use Throwable;
class RedisServerException extends \Exception
class ServerException extends Exception
{
public function __construct(string $message = "", int $code = 0, ?Throwable $previous = null)
{

View file

@ -0,0 +1,19 @@
<?php
namespace TamerLib\Exceptions;
use Exception;
use Throwable;
class TimeoutException extends Exception
{
/**
* @param string $message
* @param int $code
* @param Throwable|null $previous
*/
public function __construct(string $message = "", int $code = 0, ?Throwable $previous = null)
{
parent::__construct($message, $code, $previous);
}
}

View file

@ -0,0 +1,311 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib\Objects;
use Exception;
use TamerLib\Enums\JobStatus;
use TamerLib\Enums\JobType;
class JobPacket
{
/**
* @var int
*/
private $id;
/**
* @var int
*/
private $channel;
/**
* @var int
*/
private $job_type;
/**
* @var string
*/
private $status;
/**
* @var string|null
*/
private $worker_id;
/**
* @var mixed|null
*/
private $parameters;
/**
* @var mixed|null
*/
private $payload;
/**
* @var mixed|null
*/
private $return_value;
/**
* @var string|null
*/
private $exception;
/**
* @var string|null
*/
private $return_channel;
/**
* @var int
*/
private $created_at;
/**
* @param array $data
* @throws Exception
*/
public function __construct(array $data=[])
{
$this->id = $data['id'] ?? random_int(1000000000, 9999999999);
$this->channel = $data['channel'] ?? 0;
$this->job_type = $data['job_type'] ?? JobType::CLOSURE;
$this->status = $data['status'] ?? JobStatus::WAITING;
$this->worker_id = $data['worker_id'] ?? null;
$this->parameters = $data['parameters'] ?? null;
$this->payload = $data['payload'] ?? null;
$this->return_value = $data['return_value'] ?? null;
$this->exception = $data['exception'] ?? null;
$this->return_channel = $data['return_channel'] ?? null;
$this->created_at = $data['created_at'] ?? time();
}
/**
* Returns the JobPacket's ID
*
* @return int
*/
public function getId(): int
{
return (int)$this->id;
}
/**
* Sets the ID of the JobPacket, if null is passed, a random ID will be generated
*
* @param int|null $id
* @throws Exception
*/
public function setId(?int $id=null): void
{
if($id !== null)
{
$this->id = random_int(1000000000, 9999999999);
return;
}
$this->id = (int)$id;
}
/**
* Returns the channel the JobPacket is assigned to
*
* @return int
*/
public function getChannel(): int
{
return (int)$this->channel;
}
/**
* Sets the channel the JobPacket is assigned to, if null is passed, the channel will be set to 0
* 0 is the default channel and is used for all jobs that are not assigned to a specific channel
*
* @param int|null $channel
* @noinspection PhpCastIsUnnecessaryInspection
* @noinspection UnnecessaryCastingInspection
*/
public function setChannel(?int $channel=null): void
{
if($channel === null)
{
$this->channel = 0;
return;
}
$this->channel = (int)$channel;
}
/**
* Returns the Job Type of the JobPacket
*
* @return int
*/
public function getJobType(): int
{
return (int)$this->job_type;
}
/**
* @param int $job_type
*/
public function setJobType(int $job_type): void
{
$this->job_type = $job_type;
}
/**
* @return int
*/
public function getStatus(): int
{
return (int)$this->status;
}
/**
* @param int $status
*/
public function setStatus(int $status): void
{
$this->status = (int)$status;
}
/**
* @return string|null
*/
public function getWorkerId(): ?string
{
return (string)$this->worker_id;
}
/**
* @param string|null $worker_id
*/
public function setWorkerId(?string $worker_id): void
{
$this->worker_id = $worker_id;
}
/**
* @return mixed|null
*/
public function getParameters(): mixed
{
return $this->parameters;
}
/**
* @param mixed|null $parameters
*/
public function setParameters(mixed $parameters): void
{
$this->parameters = $parameters;
}
/**
* @return mixed|null
*/
public function getPayload(): mixed
{
return $this->payload;
}
/**
* @param mixed|null $payload
*/
public function setPayload(mixed $payload): void
{
$this->payload = $payload;
}
/**
* @return mixed|null
*/
public function getReturnValue(): mixed
{
return $this->return_value;
}
/**
* @param mixed|null $return_value
*/
public function setReturnValue(mixed $return_value): void
{
$this->return_value = $return_value;
}
/**
* @return string|null
*/
public function getException(): ?string
{
return $this->exception;
}
/**
* @param string|null $exception
*/
public function setException(?string $exception): void
{
$this->exception = $exception;
}
/**
* @return string|null
*/
public function getReturnChannel(): ?string
{
return $this->return_channel;
}
/**
* @param string|null $return_channel
*/
public function setReturnChannel(?string $return_channel): void
{
$this->return_channel = $return_channel;
}
/**
* @return int|mixed
*/
public function getCreatedAt(): mixed
{
return $this->created_at;
}
/**
* @param int|mixed $created_at
*/
public function setCreatedAt(mixed $created_at): void
{
$this->created_at = $created_at;
}
/**
* Returns an array representation of the JobPacket
*
* @return array
*/
public function toArray(): array
{
return [
'id' => $this->getId(),
'channel' => $this->getChannel(),
'job_type' => $this->getJobType(),
'status' => $this->getStatus(),
'worker_id' => $this->getWorkerId(),
'parameters' => $this->getParameters(),
'payload' => $this->getPayload(),
'return_value' => $this->getReturnValue(),
'exception' => $this->getException(),
'return_channel' => $this->getReturnChannel(),
'created_at' => $this->getCreatedAt()
];
}
}

View file

@ -0,0 +1,112 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib\Objects;
use Exception;
use LogLib\Log;
use RuntimeException;
use TamerLib\Classes\Utilities;
use TamerLib\Exceptions\NoAvailablePortException;
class ServerConfiguration
{
/**
* @var string
*/
private $host;
/**
* @var int
*/
private $port;
/**
* @var string|null
*/
private $password;
/**
* @var int
*/
private $database;
/**
* ServerConfiguration constructor.
*
* @param string $host
* @param int|null $port
* @param string|null $password
* @param int $database
*/
public function __construct(string $host='127.0.0.1', ?int $port=null, ?string $password=null, int $database=0)
{
$this->host = $host;
$this->port = $port;
$this->password = $password;
$this->database = $database;
if(is_null($port))
{
try
{
$port = Utilities::getAvailablePort($host);
}
catch(Exception $e)
{
Log::warning('net.nosial.tamerlib', 'No available port found. Using random port.');
try
{
$port = random_int(1024, 65535);
}
catch(Exception $e)
{
throw new RuntimeException('Could not generate random port.', 0, $e);
}
}
finally
{
Log::debug('net.nosial.tamerlib', 'Selected port ' . $port . '.');
}
}
if(!is_null($port))
{
$this->port = $port;
}
}
/**
* @return string
*/
public function getHost(): string
{
return $this->host;
}
/**
* @return int
*/
public function getPort(): int
{
return $this->port;
}
/**
* @return string|null
*/
public function getPassword(): ?string
{
return $this->password;
}
/**
* @return int
*/
public function getDatabase(): int
{
return $this->database;
}
}

View file

@ -0,0 +1,196 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib\Objects;
use Exception;
use RuntimeException;
use TamerLib\Classes\Utilities;
class WorkerConfiguration
{
/**
* @var string
*/
private $worker_id;
/**
* @var string
*/
private $host;
/**
* @var int
*/
private $port;
/**
* @var string|null
*/
private $password;
/**
* @var int|null
*/
private $database;
/**
* @var int|null
*/
private $channel;
/**
* WorkerConfiguration constructor.
*
* @param array $data
* @throws Exception
*/
public function __construct(array $data=[])
{
$this->worker_id = $data['worker_id'] ?? Utilities::generateRandomString();
$this->host = $data['host'] ?? '127.0.0.1';
$this->port = $data['port'] ?? null;
$this->password = $data['password'] ?? null;
$this->database = $data['database'] ?? 0;
$this->channel = $data['channel'] ?? 0;
}
/**
* Constructs object from environment variables.
*
* @return WorkerConfiguration
* @throws Exception
*/
public static function fromEnvironment(): WorkerConfiguration
{
if(getenv('TAMER_WORKER') !== 'true')
{
throw new RuntimeException('Process is not running as a worker.');
}
$data = [
'worker_id' => getenv('TAMER_WORKER_ID'),
'host' => getenv('TAMER_WORKER_HOST'),
'port' => getenv('TAMER_WORKER_PORT'),
'password' => getenv('TAMER_WORKER_PASSWORD'),
'database' => getenv('TAMER_WORKER_DATABASE'),
'channel' => getenv('TAMER_WORKER_CHANNEL'),
];
return new WorkerConfiguration($data);
}
/**
* @return string
*/
public function getWorkerId(): string
{
return $this->worker_id;
}
/**
* @param string $worker_id
*/
public function setWorkerId(string $worker_id): void
{
$this->worker_id = $worker_id;
}
/**
* @return string
*/
public function getHost(): string
{
return $this->host;
}
/**
* @param string $host
*/
public function setHost(string $host): void
{
$this->host = $host;
}
/**
* @return int
*/
public function getPort(): int
{
return $this->port;
}
/**
* @param int $port
*/
public function setPort(int $port): void
{
$this->port = $port;
}
/**
* @return string|null
*/
public function getPassword(): ?string
{
return $this->password;
}
public function setPassword(?string $password): void
{
$this->password = $password;
}
/**
* @return int|null
*/
public function getDatabase(): ?int
{
return $this->database;
}
/**
* @param int|null $database
* @return void
*/
public function setDatabase(?int $database): void
{
$this->database = $database;
}
/**
* @return int
*/
public function getChannel(): int
{
return $this->channel;
}
/**
* @param int $channel
* @return void
*/
public function setChannel(int $channel=0): void
{
$this->channel = $channel;
}
/**
* Outputs environment variables for worker.
*
* @return array
*/
public function toEnvironment(): array
{
return [
'TAMER_WORKER' => 'true',
'TAMER_WORKER_ID' => $this->getWorkerId(),
'TAMER_WORKER_HOST' => $this->getHost(),
'TAMER_WORKER_PORT' => $this->getPort(),
'TAMER_WORKER_PASSWORD' => $this->getPassword(),
'TAMER_WORKER_DATABASE' => $this->getDatabase(),
'TAMER_WORKER_CHANNEL' => $this->getChannel(),
];
}
}

View file

@ -0,0 +1,187 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib\Objects;
use Exception;
use LogLib\Log;
use RuntimeException;
use Symfony\Component\Process\PhpExecutableFinder;
use Symfony\Component\Process\Process;
class WorkerInstance
{
/**
* @var WorkerConfiguration
*/
private $configuration;
/**
* @var null
*/
private $path;
/**
* @var Process|null
*/
private $process;
/**
* WorkerInstance constructor.
*
* @param WorkerConfiguration $configuration
* @param string $path
*/
public function __construct(WorkerConfiguration $configuration, string $path)
{
$this->configuration = $configuration;
$this->path = $path;
}
/**
* Determines if the worker is running.
*
* @return bool
*/
public function isRunning(): bool
{
if(is_null($this->process))
{
return false;
}
return $this->process->isRunning();
}
/**
* Terminates the worker process.
*
* @return void
*/
public function stop(): void
{
if(is_null($this->process))
{
return;
}
try
{
$this->process->stop();
}
catch(Exception $e)
{
Log::warning('net.nosial.tamerlib', sprintf('Failed to stop worker %s', $this->configuration->getWorkerId()), $e);
}
finally
{
$this->process = null;
Log::debug('net.nosial.tamerlib', sprintf('Stopped worker %s', $this->configuration->getWorkerId()));
}
}
/**
* Starts the process for the worker.
*
* @return void
*/
public function start(): void
{
if($this->isRunning())
{
return;
}
$php_bin = (new PhpExecutableFinder())->find();
$process = new Process([$php_bin, $this->path]);
$process->setEnv($this->configuration->toEnvironment());
try
{
$process->start();
}
catch(Exception $e)
{
Log::warning('net.nosial.tamerlib', sprintf('Failed to start worker %s', $this->configuration->getWorkerId()), $e);
return;
}
finally
{
$this->process = $process;
Log::debug('net.nosial.tamerlib', sprintf('Started worker %s', $this->configuration->getWorkerId()));
}
}
/**
* Returns the last output from the worker.
*
* @return string|null
*/
public function getOutput(): ?string
{
if(is_null($this->process))
{
return '';
}
$output = $this->process->getIncrementalOutput();
return empty($output) ? null : $output;
}
/**
* Monitors the worker for a given amount of time, or indefinitely if no timeout is given.
* Throws an exception if the worker is not running.
* Outputs the worker's output to the console.
*
* @param int $timeout
* @return void
*/
public function monitor(int $timeout=0): void
{
$time_start = time();
while(true)
{
if(!$this->isRunning())
{
throw new RuntimeException(sprintf('Worker %s is not running', $this->configuration->getWorkerId()));
}
$output = $this->getOutput();
if(!is_null($output))
{
print($output);
}
if($timeout > 0 && (time() - $time_start) > $timeout)
{
break;
}
}
}
/**
* @return Process|null
*/
public function getProcess(): ?Process
{
return $this->process;
}
/**
* @return WorkerConfiguration
*/
public function getConfiguration(): WorkerConfiguration
{
return $this->configuration;
}
/**
* @return string|null
*/
public function getPath(): ?string
{
return $this->path;
}
}

View file

@ -1,11 +0,0 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib;
class Tamer
{
}

684
src/TamerLib/tm.php Normal file
View file

@ -0,0 +1,684 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib;
use Closure;
use Exception;
use InvalidArgumentException;
use LogLib\Log;
use Opis\Closure\SerializableClosure;
use Redis;
use RedisException;
use RuntimeException;
use TamerLib\Classes\AdaptiveSleep;
use TamerLib\Classes\JobManager;
use TamerLib\Classes\RedisServer;
use TamerLib\Classes\WorkerSupervisor;
use TamerLib\Enums\EncodingType;
use TamerLib\Enums\JobStatus;
use TamerLib\Enums\JobType;
use TamerLib\Enums\TamerMode;
use TamerLib\Exceptions\JobNotFoundException;
use TamerLib\Exceptions\NoAvailablePortException;
use TamerLib\Exceptions\ServerException;
use TamerLib\Exceptions\TimeoutException;
use TamerLib\Objects\JobPacket;
use TamerLib\Objects\ServerConfiguration;
use TamerLib\Objects\WorkerConfiguration;
use Throwable;
/**
* @method static mixed __call(string $name, array $arguments)
*/
class tm
{
/**
* @var string|null
*/
private static $mode;
/**
* @var ServerConfiguration|null
*/
private static $server_configuration;
/**
* @var RedisServer|null
*/
private static $server;
/**
* @var int[]
*/
private static $watching_jobs = [];
/**
* @var WorkerSupervisor|null
*/
private static $supervisor;
/**
* @var JobManager|null
*/
private static $job_manager;
/**
* @var WorkerConfiguration|null
*/
private static $worker_configuration;
/**
* @var array
*/
private static $function_pointers = [];
/**
* @var string|null
*/
private static $return_channel;
/**
* INTERNAL FUNCTIONS
*/
/**
* Appends the job ID to the watch list
*
* @param int $job_id
* @return void
*/
private static function addToWatchlist(int $job_id): void
{
if(!in_array($job_id, self::$watching_jobs, true))
{
self::$watching_jobs[] = $job_id;
}
}
/**
* Removes the job ID from the watch list
*
* @param int $job_id
* @return void
*/
private static function removeFromWatchlist(int $job_id): void
{
if(($key = array_search($job_id, self::$watching_jobs, true)) !== false)
{
unset(self::$watching_jobs[$key]);
}
}
/**
* GLOBAL FUNCTIONS
*/
/**
* Initializes Tamer in the specified mode
*
* Note that Tamer can only be initialized once per process, additionally not all functions are available in
* all modes. Please review the documentation for CLIENT & WORKER mode usage as both modes operate differently.
*
* CLIENT MODE:
* Client Mode supervises workers and optionally initializes a server instance if the $server_config parameter
* is left null, in which case a server will be initialized with default parameters. If a server is already
* initialized then CLIENT Mode can work in swarm mode where multiple clients can be initialized, and they will
* all share the same server instance. This is done by passing the same $server_config object to each client.
*
* WORKER MODE:
* Worker Mode is responsible for listening for jobs and executing them. Worker Mode can only be initialized
* if the parent process is a client process, otherwise an exception will be thrown because the worker will
* have no server to connect to if it is not initialized by a client.
*
* @param string|null $mode
* @param ServerConfiguration|null $server_config
* @return void
* @throws ServerException
* @throws Exception
*/
public static function initalize(?string $mode, ?ServerConfiguration $server_config=null): void
{
if(self::$mode !== null)
{
throw new RuntimeException('TamerLib has already been initialized.');
}
if(!in_array(strtolower($mode), TamerMode::ALL, true))
{
throw new InvalidArgumentException(sprintf('Invalid mode "%s" provided, must be one of "%s".', $mode, implode('", "', TamerMode::ALL)));
}
self::$mode = $mode;
self::$server_configuration = $server_config;
if($server_config === null && $mode === TamerMode::CLIENT)
{
try
{
// Initialize the server if no configuration was provided, and we are in client mode
self::$server_configuration = new ServerConfiguration();
self::$server = new RedisServer(self::$server_configuration);
self::$server->start();
// Register shutdown function to stop the server when the process exits
register_shutdown_function(static function()
{
self::$server?->stop();
});
}
catch(Exception $e)
{
throw new ServerException('Failed to initialize the server.', 0, $e);
}
}
if($mode === TamerMode::WORKER)
{
try
{
self::$worker_configuration = WorkerConfiguration::fromEnvironment();
self::$server_configuration = new ServerConfiguration(self::$worker_configuration->getHost(), self::$worker_configuration->getPort(), self::$worker_configuration->getPassword());
}
catch(Exception $e)
{
throw new RuntimeException('Failed to initialize worker configuration. (is the process running as a worker?)', 0, $e);
}
}
if($mode === TamerMode::CLIENT)
{
self::$supervisor = new WorkerSupervisor(self::$server_configuration);
self::$return_channel = 'rch' . random_int(100000000, 999999999);
}
self::$job_manager = new JobManager(self::$server_configuration);
}
/**
* Shuts down all workers
*
* @return void
*/
public static function shutdown(): void
{
if(self::$mode === null)
{
return;
}
if(self::$mode === TamerMode::CLIENT && self::$supervisor !== null)
{
self::$supervisor->stopAll();
}
}
/**
* CLIENT FUNCTIONS
*/
/**
* Spawns a worker process by their count, if the path is null then a generic sub process will be spawned
* that will only be capable of executing closures.
*
* @param int $count
* @param string|null $path
* @param int $channel
* @return void
*/
public static function createWorker(int $count=8, ?string $path=null, int $channel=0): void
{
if(self::$mode !== TamerMode::CLIENT)
{
throw new RuntimeException(sprintf('Attempting to spawn a worker in \'%s\' mode, only clients can spawn workers.', self::$mode));
}
if($path === null)
{
self::$supervisor->spawnClosure($count, $channel);
}
else
{
self::$supervisor->spawnWorker($path, $count, $channel);
}
}
/**
* Preforms a job in the background, returns the Job ID to keep track of the job status.
*
* @param callable $function
* @param array $arguments
* @param int $channel
* @return string
*/
public static function do(callable $function, array $arguments, int $channel=0): string
{
if(self::$mode !== TamerMode::CLIENT)
{
throw new RuntimeException(sprintf('Attempting to do() in \'%s\' mode, only clients can preform do().', self::$mode));
}
$job_packet = new JobPacket();
$job_packet->setJobType(JobType::CLOSURE);
$job_packet->setParameters(serialize($arguments));
$job_packet->setPayload(serialize(new SerializableClosure($function)));
$job_packet->setChannel($channel);
$job_packet->setReturnChannel(self::$return_channel);
try
{
self::$job_manager->pushJob($job_packet);
}
catcH(Exception $e)
{
throw new RuntimeException('do() failed, failed to push job to the server', 0, $e);
}
self::addToWatchlist($job_packet->getId());
return $job_packet->getId();
}
/**
* Preforms a function call against a worker in the background, returns the Job ID to keep track of the job status.
*
* @param string $function
* @param array $arguments
* @param int $channel
* @return mixed
*/
public static function call(string $function, array $arguments, int $channel=0): mixed
{
if(self::$mode !== TamerMode::CLIENT)
{
throw new RuntimeException(sprintf('Attempting to call() in \'%s\' mode, only clients can preform call().', self::$mode));
}
$job_packet = new JobPacket();
$job_packet->setJobType(JobType::FUNCTION);
$job_packet->setParameters(serialize($arguments));
$job_packet->setPayload($function);
$job_packet->setChannel($channel);
$job_packet->setReturnChannel(self::$return_channel);
try
{
self::$job_manager->pushJob($job_packet);
}
catcH(Exception $e)
{
throw new RuntimeException('call() failed, failed to push job to the server', 0, $e);
}
self::addToWatchlist($job_packet->getId());
return $job_packet->getId();
}
/**
* Does a job in the background, but once the job is completed it will be forgotten and the result will not be
* returned, this also means that the job will not be added to the watchlist.
*
* @param callable $function
* @param array $arguments
* @param int $channel
* @return void
*/
public function dof(callable $function, array $arguments, int $channel=0): void
{
if(self::$mode !== TamerMode::CLIENT)
{
throw new RuntimeException(sprintf('Attempting to dof() in \'%s\' mode, only clients can preform dof().', self::$mode));
}
$job_packet = new JobPacket();
$job_packet->setJobType(JobType::CLOSURE);
$job_packet->setParameters(serialize($arguments));
$job_packet->setPayload(serialize(new SerializableClosure($function)));
$job_packet->setChannel($channel);
try
{
self::$job_manager->pushJob($job_packet);
}
catcH(Exception $e)
{
throw new RuntimeException('do() failed, failed to push job to the server', 0, $e);
}
self::addToWatchlist($job_packet->getId());
}
/**
* Sends a function call to a worker in the background, but once the job is completed it will be forgotten and
* the result will not be returned, this also means that the job will not be added to the watchlist.
*
* @param string $function
* @param array $arguments
* @param int $channel
* @return void
*/
public static function callf(string $function, array $arguments, int $channel=0): void
{
if(self::$mode !== TamerMode::CLIENT)
{
throw new RuntimeException(sprintf('Attempting to callf() in \'%s\' mode, only clients can preform callf().', self::$mode));
}
$job_packet = new JobPacket();
$job_packet->setJobType(JobType::FUNCTION);
$job_packet->setParameters(serialize($arguments));
$job_packet->setForget(true);
$job_packet->setPayload($function);
$job_packet->setChannel($channel);
try
{
self::$job_manager->pushJob($job_packet);
}
catcH(Exception $e)
{
throw new RuntimeException('callf() failed, failed to push job to the server', 0, $e);
}
self::addToWatchlist($job_packet->getId());
}
/**
* Waits for all the dispatched jobs to complete, this is a blocking function and will not return until all the
* jobs have completed. If a timeout is specified, the function will return after the timeout has been reached.
*
* @param callable $callback
* @param int $timeout
* @return void
* @throws ServerException
* @throws Throwable
* @throws TimeoutException
*/
public static function wait(callable $callback, int $timeout=0): void
{
if(self::$mode !== TamerMode::CLIENT)
{
throw new RuntimeException(sprintf('Attempting to wait() in \'%s\' mode, only clients can preform wait().', self::$mode));
}
$time_start = time();
while(true)
{
if(count(self::$watching_jobs) === 0)
{
Log::debug('net.nosial.tamerlib', 'No jobs to wait for, returning');
return;
}
Log::debug('net.nosial.tamerlib', 'Waiting for jobs to complete');
$job_packet = self::$job_manager->listenReturnChannel(self::$return_channel);
if(in_array($job_packet->getId(), self::$watching_jobs))
{
Log::debug('net.nosial.tamerlib', sprintf('Job \'%s\' has returned, removing from watchlist', $job_packet->getId()));
self::removeFromWatchlist($job_packet->getId());
self::$job_manager->dropJob($job_packet->getId());
if($job_packet->getStatus() === JobStatus::FINISHED)
{
$return_value = $job_packet->getReturnValue();
if($return_value === null)
{
$return_value = null;
}
else
{
$return_value = unserialize($return_value, ['allowed_classes' => true]);
if($return_value === false)
{
Log::error('net.nosial.tamerlib', 'Failed to unserialize return value, return value was dropped');
$return_value = null;
}
}
$callback($job_packet->getId(), $return_value);
}
elseif($job_packet->getStatus() === JobStatus::FAILED)
{
try
{
$e = unserialize($job_packet->getException(), ['allowed_classes' => true]);
}
catch(Exception $e)
{
Log::error('net.nosial.tamerlib', 'Failed to unserialize exception, exception was dropped', $e);
}
finally
{
if(isset($e) && $e instanceof Throwable)
{
throw $e;
}
throw new ServerException('wait() failed, job returned with an exception');
}
}
else
{
Log::debug('net.nosial.tamerlib', sprintf('Job \'%s\' returned with an unexpected status of \'%s\'', $job_packet->getId(), $job_packet->getStatus()));
throw new ServerException('wait() failed, job returned with an unexpected status of \'' . $job_packet->getStatus() . '\'');
}
}
else
{
Log::debug('net.nosial.tamerlib', sprintf('Job \'%s\' has returned, but is not in the watchlist', $job_packet->getId()));
}
if ($timeout < 0)
{
throw new TimeoutException('wait() timed out');
}
if($timeout > 0 && (time() - $time_start) >= $timeout)
{
throw new TimeoutException('wait() timed out');
}
usleep(10);
}
}
/**
* Waits for a job to complete, returns the result of the job.
*
* @param JobPacket|int $job_id
* @param int $timeout
* @return mixed
* @throws JobNotFoundException
* @throws ServerException
* @throws TimeoutException
* @throws Throwable
*/
public static function waitFor(JobPacket|int $job_id, int $timeout=0): mixed
{
if(self::$mode !== TamerMode::CLIENT)
{
throw new RuntimeException(sprintf('Attempting to waitFor() in \'%s\' mode, only clients can preform waitFor().', self::$mode));
}
if($job_id instanceof JobPacket)
{
$job_id = $job_id->getId();
}
$time_start = time();
while(true)
{
self::$supervisor->monitor(-1);
switch(self::$job_manager->getJobStatus($job_id))
{
case JobStatus::FINISHED:
$return = self::$job_manager->getJobResult($job_id);
self::$job_manager->dropJob($job_id);
return $return;
case JobStatus::FAILED:
$throwable = self::$job_manager->getJobException($job_id);
self::$job_manager->dropJob($job_id);
throw $throwable;
}
if($timeout < 0)
{
throw new TimeoutException('waitFor() timed out');
}
if($timeout > 0 && (time() - $time_start) >= $timeout)
{
throw new TimeoutException(sprintf('waitFor() timed out after %d seconds', $timeout));
}
usleep(10);
}
}
/**
* Clears the watchlist, this will remove all jobs from the watchlist.
*
* @return void
*/
public static function clear(): void
{
if(self::$mode !== TamerMode::CLIENT)
{
throw new RuntimeException(sprintf('Attempting to clear() in \'%s\' mode, only clients can preform clear().', self::$mode));
}
self::$watching_jobs = [];
}
/**
* Invokes the call() function, returns the Job ID.
*
* @param string $name
* @param array $arguments
* @return mixed
*/
public static function __callStatic(string $name, array $arguments)
{
return self::call($name, $arguments);
}
/**
* WORKER FUNCTIONS
*/
/**
* @param string $function
* @param callable $callback
* @return void
*/
public static function addFunction(string $function, callable $callback): void
{
if(self::$mode !== TamerMode::WORKER)
{
throw new RuntimeException(sprintf('Attempting to addFunction() in \'%s\' mode, only workers can preform addFunction().', self::$mode));
}
self::$function_pointers[$function] = $callback;
}
/**
* @param string $function
* @return void
*/
public static function removeFunction(string $function): void
{
if(self::$mode !== TamerMode::WORKER)
{
throw new RuntimeException(sprintf('Attempting to removeFunction() in \'%s\' mode, only workers can preform removeFunction().', self::$mode));
}
unset(self::$function_pointers[$function]);
}
/**
* @return array
*/
public static function getFunctions(): array
{
if(self::$mode !== TamerMode::WORKER)
{
throw new RuntimeException(sprintf('Attempting to getFunctions() in \'%s\' mode, only workers can preform getFunctions().', self::$mode));
}
return array_keys(self::$function_pointers);
}
/**
* @param int|array $channel
* @param int $timeout
* @return void
* @throws JobNotFoundException
* @throws ServerException
*/
public static function run(int|array $channel=0, int $timeout=0): void
{
if(self::$mode !== TamerMode::WORKER)
{
throw new RuntimeException(sprintf('Attempting to run() in \'%s\' mode, only workers can preform run().', self::$mode));
}
try
{
$job_packet = self::$job_manager->listenForJob(self::$worker_configuration->getWorkerId(), $channel, $timeout);
}
catch(TimeoutException $e)
{
unset($e);
return;
}
Log::debug('net.nosial.tamerlib', sprintf('Worker %s received job %s', self::$worker_configuration->getWorkerId(), $job_packet->getId()));
switch($job_packet->getJobType())
{
case JobType::FUNCTION:
if(!isset(self::$function_pointers[$job_packet->getPayload()]))
{
Log::warning('net.nosial.tamerlib', sprintf('Job %s requested function \'%s\' which does not exist, rejecting job.', $job_packet->getId(), $job_packet->getPayload()));
self::$job_manager->rejectJob($job_packet);
}
try
{
$result = call_user_func_array(self::$function_pointers[$job_packet->getPayload()], unserialize($job_packet->getParameters(), ['allowed_classes'=>true]));
self::$job_manager->returnJob($job_packet, $result);
}
catch(Exception $e)
{
var_dump($e);
self::$job_manager->returnException($job_packet, $e);
}
break;
case JobType::CLOSURE:
try
{
$result = unserialize($job_packet->getPayload(), ['allowed_classes'=>true])(
unserialize($job_packet->getParameters(), ['allowed_classes'=>true])
);
self::$job_manager->returnJob($job_packet, $result);
}
catch(Exception $e)
{
self::$job_manager->returnException($job_packet, $e);
}
break;
}
}
}