Refactored main class, improved a few things here and there. Looks polished enough

This commit is contained in:
Netkas 2023-06-18 17:17:13 -04:00
parent 75c6062a3e
commit 411898af2a
No known key found for this signature in database
GPG key ID: 5DAF58535614062B
14 changed files with 351 additions and 300 deletions

View file

@ -1,5 +1,6 @@
<?php
/** @noinspection PhpUnused */
/** @noinspection PhpMissingFieldTypeInspection */
namespace TamerLib;
@ -19,6 +20,7 @@
use TamerLib\Exceptions\JobManagerException;
use TamerLib\Exceptions\JobNotFoundException;
use TamerLib\Exceptions\ServerException;
use TamerLib\Exceptions\TamerException;
use TamerLib\Exceptions\TimeoutException;
use TamerLib\Exceptions\WorkerFailedException;
use TamerLib\Objects\JobPacket;
@ -80,7 +82,7 @@
/**
* Appends the job ID to the watch list
*
* @param int $job_id
* @param int $job_id The job ID to add to the watch list
* @return void
*/
private static function addToWatchlist(int $job_id): void
@ -94,7 +96,7 @@
/**
* Removes the job ID from the watch list
*
* @param int $job_id
* @param int $job_id The job ID to remove from the watch list
* @return void
*/
private static function removeFromWatchlist(int $job_id): void
@ -105,6 +107,34 @@
}
}
/**
* Executes a job and returns the result to the server
*
* @param JobPacket $job_packet
* @return void
* @throws ConnectionException
* @throws JobManagerException
*/
private static function executeJob(JobPacket $job_packet): void
{
try
{
if(!isset(self::$function_pointers[$job_packet->getPayload()]))
{
Log::warning(Utilities::getName(), sprintf('Job %s requested function \'%s\' which does not exist, rejecting job.', $job_packet->getId(), $job_packet->getPayload()));
self::$job_manager->rejectJob($job_packet);
return;
}
$result = call_user_func_array(self::$function_pointers[$job_packet->getPayload()], unserialize($job_packet->getParameters(), ['allowed_classes'=>true]));
self::$job_manager->returnJob($job_packet, $result);
}
catch(Exception $e)
{
self::$job_manager->returnException($job_packet, $e);
}
}
/**
* GLOBAL FUNCTIONS
*/
@ -126,13 +156,13 @@
* if the parent process is a client process, otherwise an exception will be thrown because the worker will
* have no server to connect to if it is not initialized by a client.
*
* @param string|null $mode
* @param ServerConfiguration|null $server_config
* @param string $mode The mode to initialize Tamer in, must be one of the TamerMode constants
* @param ServerConfiguration|null $server_config Optional. The server configuration to use, if null is provided
* then a server will be initialized with default parameters.
* @throws ServerException If the server fails to initialize
* @return void
* @throws ServerException
* @throws Exception
*/
public static function initialize(?string $mode, ?ServerConfiguration $server_config=null): void
public static function initialize(string $mode, ?ServerConfiguration $server_config=null): void
{
if(self::$mode !== null)
{
@ -185,7 +215,15 @@
if($mode === TamerMode::CLIENT)
{
self::$supervisor = new WorkerSupervisor(self::$server_configuration);
self::$return_channel = sprintf('rch%s', random_int(100000000, 999999999));
try
{
self::$return_channel = sprintf('rch%s', random_int(100000000, 999999999));
}
catch(Exception $e)
{
throw new RuntimeException(sprintf('Bad environment, failed to generate random integer. (%s)', $e->getMessage()), $e);
}
}
self::$job_manager = new JobManager(self::$server_configuration);
@ -195,25 +233,36 @@
* Shuts down all workers
*
* @return void
* @noinspection PhpUnused
*/
public static function shutdown(): void
{
// Do nothing if Tamer is not initialized
if(self::$mode === null)
{
return;
}
if(self::$mode === TamerMode::CLIENT && self::$supervisor !== null)
{
self::$supervisor->stopAll();
}
// Close all subprocesses
self::$supervisor?->stopAll();
self::$server?->stop();
self::$job_manager?->disconnect();
// Clear all static variables
self::$mode = null;
self::$server_configuration = null;
self::$server = null;
self::$watching_jobs = [];
self::$supervisor = null;
self::$job_manager = null;
self::$worker_configuration = null;
self::$function_pointers = [];
self::$return_channel = null;
}
/**
* Returns the current mode Tamer is running in
*
* @return string
* @return string The current mode Tamer is running in
*/
public static function getMode(): string
{
@ -223,7 +272,8 @@
/**
* Monitors all internal processes
*
* @param int $timeout
* @param int $timeout The timeout in seconds to monitor for, if -1 is provided then the monitor will run
* for one iteration and then return. If 0 is provided then the monitor will run forever.
* @return void
*/
public static function monitor(int $timeout=0): void
@ -271,11 +321,12 @@
* Spawns a worker process by their count, if the path is null then a generic sub process will be spawned
* that will only be capable of executing closures.
*
* @param int $count
* @param string|null $path
* @param int $channel
* @param int $count The number of workers to spawn (defaults to 8)
* @param string|null $path The path to the worker file to spawn, if null is provided then a generic worker
* will be spawned that can only execute closures (unimplemented)
* @param int $channel The channel to spawn the workers on (defaults to 0)
* @throws WorkerFailedException If the worker fails to spawn
* @return void
* @throws WorkerFailedException
*/
public static function createWorker(int $count=8, ?string $path=null, int $channel=0): void
{
@ -299,12 +350,12 @@
/**
* Preforms a function call against a worker in the background, returns the Job ID to keep track of the job status.
*
* @param string $function
* @param array $arguments
* @param int $channel
* @return int
* @param string $function The function to call
* @param array $arguments The arguments to pass to the function
* @param int $channel The channel to preform the function call on (defaults to 0)
* @return int The Job ID of the function call
*/
public static function do(string $function, array $arguments, int $channel=0): int
public static function do(string $function, array $arguments=[], int $channel=0): int
{
if(self::$mode !== TamerMode::CLIENT)
{
@ -321,9 +372,13 @@
{
self::$job_manager->pushJob($job_packet);
}
catcH(Exception $e)
catch(ConnectionException $e)
{
throw new RuntimeException('do() failed, failed to push job to the server', 0, $e);
throw new RuntimeException('do() failed, failed to connect to the server', 0, $e);
}
catch(JobManagerException $e)
{
throw new RuntimeException('do() failed, failed to push the job to the server', 0, $e);
}
self::addToWatchlist($job_packet->getId());
@ -334,13 +389,12 @@
* Sends a function call to a worker in the background, but once the job is completed it will be forgotten and
* the result will not be returned, this also means that the job will not be added to the watchlist.
*
* @param string $function
* @param array $arguments
* @param int $channel
* @param string $function The function to call
* @param array $arguments The arguments to pass to the function
* @param int $channel The channel to preform the function call on (defaults to 0)
* @return void
* @noinspection PhpUnused
*/
public static function dof(string $function, array $arguments, int $channel=0): void
public static function dof(string $function, array $arguments=[], int $channel=0): void
{
if(self::$mode !== TamerMode::CLIENT)
{
@ -360,20 +414,22 @@
{
throw new RuntimeException('dof() failed, failed to push job to the server', 0, $e);
}
self::addToWatchlist($job_packet->getId());
}
/**
* Waits for all the dispatched jobs to complete, this is a blocking function and will not return until all the
* jobs have completed. If a timeout is specified, the function will return after the timeout has been reached.
*
* @param callable $callback
* @param int $timeout
* @param callable $callback A callback function that will be called after each iteration of the wait loop
* @param int $timeout The timeout in seconds, if 0 is provided then the function will block until all the jobs
* have completed, if -1 is provided then the function run for one iteration and return
* @return void
* @throws ServerException
* @throws Throwable
* @throws TimeoutException
* @throws JobManagerException If the JobManager throws an exception
* @throws TamerException If the Tamer throws an exception
* @throws TimeoutException If the timeout is reached
* @throws ConnectionException If the client fails to connect to the server
* @throws Exception If a job fails
* @throws Throwable If a job fails
*/
public static function wait(callable $callback, int $timeout=0): void
{
@ -388,68 +444,57 @@
self::monitor(-1);
if(count(self::$watching_jobs) === 0)
{
Log::debug(Utilities::getName(), 'No jobs to wait for, returning');
return;
}
Log::debug(Utilities::getName(), 'Waiting for jobs to complete');
$job_packet = self::$job_manager->listenReturnChannel(self::$return_channel);
if(in_array($job_packet->getId(), self::$watching_jobs))
if(!in_array($job_packet->getId(), self::$watching_jobs))
{
self::removeFromWatchlist($job_packet->getId());
Log::debug(Utilities::getName(), sprintf('Job \'%s\' has returned, but is not in the watchlist', $job_packet->getId()));
self::$job_manager->dropJob($job_packet->getId());
continue;
}
if($job_packet->getStatus() === JobStatus::FINISHED)
self::removeFromWatchlist($job_packet->getId());
self::$job_manager->dropJob($job_packet->getId());
if($job_packet->getStatus() === JobStatus::FINISHED)
{
$return_value = $job_packet->getReturnValue();
if($return_value !== null)
{
$return_value = $job_packet->getReturnValue();
if($return_value === null)
{
$return_value = null;
}
else
{
$return_value = unserialize($return_value, ['allowed_classes' => true]);
if($return_value === false)
{
Log::error(Utilities::getName(), 'Failed to unserialize return value, return value was dropped');
$return_value = null;
}
}
$callback($job_packet->getId(), $return_value);
$return_value = unserialize($return_value, ['allowed_classes' => true]);
}
elseif($job_packet->getStatus() === JobStatus::FAILED)
{
try
{
$e = unserialize($job_packet->getException(), ['allowed_classes' => true]);
}
catch(Exception $e)
{
Log::error(Utilities::getName(), 'Failed to unserialize exception, exception was dropped', $e);
}
finally
{
if(isset($e) && $e instanceof Throwable)
{
throw $e;
}
throw new ServerException('wait() failed, job returned with an exception');
}
}
else
$callback($job_packet->getId(), $return_value);
}
elseif($job_packet->getStatus() === JobStatus::FAILED)
{
try
{
Log::debug(Utilities::getName(), sprintf('Job \'%s\' returned with an unexpected status of \'%s\'', $job_packet->getId(), $job_packet->getStatus()));
throw new ServerException('wait() failed, job returned with an unexpected status of \'' . $job_packet->getStatus() . '\'');
$e = unserialize($job_packet->getException(), ['allowed_classes' => true]);
}
catch(Exception $e)
{
Log::error(Utilities::getName(), 'Failed to unserialize exception, exception was dropped', $e);
}
finally
{
if(isset($e) && $e instanceof Throwable)
{
throw $e;
}
/** @noinspection ThrowRawExceptionInspection */
throw new Exception(sprintf('wait() failed, job \'%s\' failed with an unknown exception', $job_packet->getId()));
}
}
else
{
Log::debug(Utilities::getName(), sprintf('Job \'%s\' has returned, but is not in the watchlist', $job_packet->getId()));
throw new TamerException(sprintf('wait() failed, job \'%s\' returned with an unknown status \'%s\'', $job_packet->getId(), $job_packet->getStatus()));
}
if ($timeout < 0)
@ -462,38 +507,36 @@
throw new TimeoutException('wait() timed out');
}
usleep(10);
usleep(1000);
}
}
/**
* Waits for a job to complete, returns the result of the job.
*
* @param JobPacket|int $job_id
* @param int $timeout
* @return mixed
* @throws JobNotFoundException
* @throws ServerException
* @throws TimeoutException
* @throws Throwable
* @param int $job_id The ID of the job to wait for
* @param int $timeout The timeout in seconds, if 0 is provided then the function will block until all the jobs
* have completed, if -1 is provided then the function run for one iteration and return
* @throws ConnectionException If the client fails to connect to the server
* @throws JobManagerException If the JobManager throws an exception
* @throws JobNotFoundException If the job is not found
* @throws TimeoutException If the timeout is reached
* @throws Throwable If the job fails
* @throws Exception If the job fails
* @return mixed The return value of the job
*/
public static function waitFor(JobPacket|int $job_id, int $timeout=0): mixed
public static function waitFor(int $job_id, int $timeout=0): mixed
{
if(self::$mode !== TamerMode::CLIENT)
{
throw new RuntimeException(sprintf('Attempting to waitFor() in \'%s\' mode, only clients can preform waitFor().', self::$mode));
}
if($job_id instanceof JobPacket)
{
$job_id = $job_id->getId();
}
$time_start = time();
while(true)
{
self::monitor(-1);
switch(self::$job_manager->getJobStatus($job_id))
{
case JobStatus::FINISHED:
@ -524,18 +567,18 @@
/**
* Preforms a do() call on a waitFor() call all in one function.
*
* @param string $function
* @param array $arguments
* @param int $channel
* @param int $timeout
* @return mixed
* @throws JobNotFoundException
* @throws ServerException
* @throws Throwable
* @throws TimeoutException
* @noinspection PhpUnused
* @param string $function The function to call
* @param array $arguments The arguments to pass to the function
* @param int $channel The channel to use
* @param int $timeout The timeout in seconds, if 0 is provided then the function will block until all the jobs
* @throws ConnectionException If the client fails to connect to the server
* @throws JobManagerException If the JobManager throws an exception
* @throws JobNotFoundException If the job is not found
* @throws TimeoutException If the timeout is reached
* @throws Throwable If the job fails
* @return mixed The return value of the job
*/
public static function doWait(string $function, array $arguments, int $channel=0, int $timeout=0): mixed
public static function doWait(string $function, array $arguments=[], int $channel=0, int $timeout=0): mixed
{
return self::waitFor(self::do($function, $arguments, $channel), $timeout);
}
@ -545,7 +588,6 @@
* Clears the watchlist, this will remove all jobs from the watchlist.
*
* @return void
* @noinspection PhpUnused
*/
public static function clear(): void
{
@ -564,7 +606,7 @@
* @param array $arguments
* @return int
*/
public static function __callStatic(string $name, array $arguments)
public static function __callStatic(string $name, array $arguments=[])
{
return self::do($name, $arguments);
}
@ -574,8 +616,12 @@
*/
/**
* @param string $function
* @param callable $callback
* Registers a new function to be called by the worker, this function will be called when the worker receives
* a job with the same name as the function. Internally this function uses call_user_func_array() to call the
* function.
*
* @param string $function The name of the function to register
* @param callable $callback The callback to call when the function is called
* @return void
*/
public static function addFunction(string $function, callable $callback): void
@ -599,9 +645,11 @@
}
/**
* @param string $function
* Removes a function from the worker, this function will no longer be called when a job with the same name
* is received.
*
* @param string $function The name of the function to remove
* @return void
* @noinspection PhpUnused
*/
public static function removeFunction(string $function): void
{
@ -610,12 +658,18 @@
throw new RuntimeException(sprintf('Attempting to removeFunction() in \'%s\' mode, only workers can preform removeFunction().', self::$mode));
}
if(!isset(self::$function_pointers[$function]))
{
throw new InvalidArgumentException(sprintf('Attempting to removeFunction() with a function name of \'%s\', this function does not exist.', $function));
}
unset(self::$function_pointers[$function]);
}
/**
* @return array
* @noinspection PhpUnused
* Returns an array of all the registered functions.
*
* @return array An array of all the registered functions
*/
public static function getFunctions(): array
{
@ -628,15 +682,17 @@
}
/**
* @param int|array $channel
* @param int $timeout
* Runs the worker, this function will block and listen for incoming jobs, if a job is received then the
* job will be processed and optionally returned back to the client via a return channel.
*
* @param int|array $channel The channel to listen on, if an array is provided then the worker will listen on
* @param int $timeout The timeout in seconds, if 0 is provided then the function will block until a job is received
* @param bool $ignore_errors If set to true then the worker will not throw exceptions, instead it will log the
* error and continue to run.
* @throws ConnectionException If the client fails to connect to the server
* @return void
* @throws ConnectionException
* @throws JobManagerException
* @throws JobNotFoundException
* @throws ServerException
*/
public static function run(int|array $channel=0, int $timeout=0): void
public static function run(int|array $channel=0, int $timeout=0, bool $ignore_errors=false): void
{
if(self::$mode !== TamerMode::WORKER)
{
@ -651,7 +707,7 @@
try
{
$job_packet = self::$job_manager->listenForJob(self::$worker_configuration->getWorkerId(), $channel, $timeout);
break;
self::executeJob($job_packet);
}
catch(TimeoutException $e)
{
@ -661,8 +717,7 @@
/** @noinspection PhpRedundantCatchClauseInspection */
catch(RedisException $e)
{
// TODO: There has to be a better way to do this.
if(strtolower($e->getMessage()) === 'redis server went away')
if($ignore_errors === false && strtolower($e->getMessage()) === 'redis server went away')
{
if($error_time === null)
{
@ -670,35 +725,25 @@
}
else if((time() - $error_time) >= 5)
{
throw new ServerException('Redis server went away, and did not come back.');
throw new ConnectionException('Redis server went away, and did not come back.');
}
}
}
catch(Exception $e)
{
Log::error(Utilities::getName(), sprintf('Worker %s encountered an error while listening for jobs: %s', self::$worker_configuration->getWorkerId(), $e->getMessage()), $e);
unset($e);
}
if($timeout === -1)
{
return;
}
if((time() - $start_time) >= $timeout)
{
return;
}
}
Log::debug(Utilities::getName(), sprintf('Worker %s received job %s', self::$worker_configuration->getWorkerId(), $job_packet->getId()));
if(!isset(self::$function_pointers[$job_packet->getPayload()]))
{
Log::warning(Utilities::getName(), sprintf('Job %s requested function \'%s\' which does not exist, rejecting job.', $job_packet->getId(), $job_packet->getPayload()));
self::$job_manager->rejectJob($job_packet);
}
try
{
$result = call_user_func_array(self::$function_pointers[$job_packet->getPayload()], unserialize($job_packet->getParameters(), ['allowed_classes'=>true]));
self::$job_manager->returnJob($job_packet, $result);
}
catch(Exception $e)
{
self::$job_manager->returnException($job_packet, $e);
}
}
}