diff --git a/.idea/runConfigurations/Exception_Test.xml b/.idea/runConfigurations/Exception_Test.xml new file mode 100644 index 0000000..f980440 --- /dev/null +++ b/.idea/runConfigurations/Exception_Test.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/Redis_Server_Test.xml b/.idea/runConfigurations/Redis_Server_Test.xml new file mode 100644 index 0000000..c87f94b --- /dev/null +++ b/.idea/runConfigurations/Redis_Server_Test.xml @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/Worker_Test.xml b/.idea/runConfigurations/Worker_Test.xml new file mode 100644 index 0000000..3f40271 --- /dev/null +++ b/.idea/runConfigurations/Worker_Test.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index ef16156..b7eca7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,10 +5,9 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [1.0.2] - Unreleased +## [2.0.0] - Unreleased -### Added - - Added non-blocking mode for supervisor +Major rewrite ## [1.0.1] - 2022-02-28 diff --git a/README.md b/README.md index 5050c8f..d70fdc5 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,7 @@ This README will contain documentation on how to effectively use TamerLib in you * [Implementing a client](#implementing-a-client) * [Implementing a worker](#implementing-a-worker) * [Implementing a node](#implementing-a-node) - * [Handling Exceptions](#handling-exceptions) + * [Job and Error Handling](#job-and-error-handling) * [Methods](#methods) * [Global Methods](#global-methods) * [initialize](#initialize) @@ -376,6 +376,9 @@ client. In this example, we will implement a worker that implements the followin A worker does not require to a server configuration, as the server configuration is obtained by the parent process once the file is executed as a worker, see the documentation for the functions used in this example for more information. +Usually at the `run()` method, if you are running it indefinitely the worker will only exit once the parent process has +terminated, if the worker gets closed unexpectedly, the parent process will restart the worker. + - [`initialize`](#initialize) - [`addFunction`](#addfunction) - [`run`](#run) @@ -405,12 +408,31 @@ functionality should be implemented in the worker script. See [Implementing a wo details on how to implement a worker script. -## Handling Exceptions +## Job and Error Handling When waiting for a Job to complete, it is possible that the job will throw an exception, this exception will be caught and re-thrown onto the client side of the application. This means that you can catch exceptions thrown by the worker and handle them accordingly. +Jobs in TamerLib are managed as follows: + + 1. Only a client can delete a job once it has received its results. + 2. A worker can delete a job if the job has no return channel to go to. This is determined by the client that's + listening for job completion when you run wait(), which allows a callback to be triggered each time a job is + immediately completed. + +Error handling in TamerLib is achieved through job statuses: + + 1. If a return channel is provided, the job's status will be set to either "success" or "failure". + 2. If the status is "success", the client will unserialize the returned results and return them. + 3. If the status is "failure", the client will unserialize the exception and return that instead. + +This mechanism ensures that your application has robust, clear information about the status of each job and can handle +any exceptions that are thrown during the execution of a job. + +These robust error handling capabilities enable you to build applications with TamerLib that can reliably manage a wide +range of tasks in parallel while using the traditional exception handling mechanisms that you are already familiar with. + ```php $job = \TamerLib\tm::do('throw_exception'); try @@ -441,6 +463,13 @@ catch(\SmtpException $e) > available in the global namespace of the client application or the same packages must be imported in the client > and worker application. If the exception cannot be unserialized a generic `\Exception` will be thrown instead. +Most of TamerLib's static methods are capable of throwing exceptions but usually it will be a `\RuntimeException` or +`\InvalidArgumentException` if the method is called with invalid parameters. Crucial methods such as `initialize()` and +`run()` will throw a `\TamerLib\Exceptions\TamerException` if the method if there is an error with the TamerLib system +itself. + + + # Methods TamerLib provides static methods that can be used to interact with the TamerLib system, these methods are often only diff --git a/project.json b/project.json index 0c2bd23..1432f12 100644 --- a/project.json +++ b/project.json @@ -41,12 +41,6 @@ "source_type": "remote", "source": "nosial/libs.log=latest@n64" }, - { - "name": "com.opis.closure", - "version": "latest", - "source_type": "remote", - "source": "opis/closure=latest@composer" - }, { "name": "com.symfony.process", "version": "latest", diff --git a/src/TamerLib/Classes/JobManager.php b/src/TamerLib/Classes/JobManager.php index 52b42dc..ba759d9 100644 --- a/src/TamerLib/Classes/JobManager.php +++ b/src/TamerLib/Classes/JobManager.php @@ -724,6 +724,7 @@ if($job_id !== false && $this->claimJob($job_id[1], $worker_id)) { + Log::debug(Utilities::getName(), sprintf('Claimed job & received job %s', $job_id[1])); return new JobPacket($this->getClient()->hGetAll($job_id[1])); } diff --git a/src/TamerLib/Classes/Utilities.php b/src/TamerLib/Classes/Utilities.php index 09f5e48..659594f 100644 --- a/src/TamerLib/Classes/Utilities.php +++ b/src/TamerLib/Classes/Utilities.php @@ -1,6 +1,8 @@ -getPayload()])) + { + Log::warning(Utilities::getName(), sprintf('Job %s requested function \'%s\' which does not exist, rejecting job.', $job_packet->getId(), $job_packet->getPayload())); + self::$job_manager->rejectJob($job_packet); + return; + } + + $result = call_user_func_array(self::$function_pointers[$job_packet->getPayload()], unserialize($job_packet->getParameters(), ['allowed_classes'=>true])); + self::$job_manager->returnJob($job_packet, $result); + } + catch(Exception $e) + { + self::$job_manager->returnException($job_packet, $e); + } + } + /** * GLOBAL FUNCTIONS */ @@ -126,13 +156,13 @@ * if the parent process is a client process, otherwise an exception will be thrown because the worker will * have no server to connect to if it is not initialized by a client. * - * @param string|null $mode - * @param ServerConfiguration|null $server_config + * @param string $mode The mode to initialize Tamer in, must be one of the TamerMode constants + * @param ServerConfiguration|null $server_config Optional. The server configuration to use, if null is provided + * then a server will be initialized with default parameters. + * @throws ServerException If the server fails to initialize * @return void - * @throws ServerException - * @throws Exception */ - public static function initialize(?string $mode, ?ServerConfiguration $server_config=null): void + public static function initialize(string $mode, ?ServerConfiguration $server_config=null): void { if(self::$mode !== null) { @@ -185,7 +215,15 @@ if($mode === TamerMode::CLIENT) { self::$supervisor = new WorkerSupervisor(self::$server_configuration); - self::$return_channel = sprintf('rch%s', random_int(100000000, 999999999)); + + try + { + self::$return_channel = sprintf('rch%s', random_int(100000000, 999999999)); + } + catch(Exception $e) + { + throw new RuntimeException(sprintf('Bad environment, failed to generate random integer. (%s)', $e->getMessage()), $e); + } } self::$job_manager = new JobManager(self::$server_configuration); @@ -195,25 +233,36 @@ * Shuts down all workers * * @return void - * @noinspection PhpUnused */ public static function shutdown(): void { + // Do nothing if Tamer is not initialized if(self::$mode === null) { return; } - if(self::$mode === TamerMode::CLIENT && self::$supervisor !== null) - { - self::$supervisor->stopAll(); - } + // Close all subprocesses + self::$supervisor?->stopAll(); + self::$server?->stop(); + self::$job_manager?->disconnect(); + + // Clear all static variables + self::$mode = null; + self::$server_configuration = null; + self::$server = null; + self::$watching_jobs = []; + self::$supervisor = null; + self::$job_manager = null; + self::$worker_configuration = null; + self::$function_pointers = []; + self::$return_channel = null; } /** * Returns the current mode Tamer is running in * - * @return string + * @return string The current mode Tamer is running in */ public static function getMode(): string { @@ -223,7 +272,8 @@ /** * Monitors all internal processes * - * @param int $timeout + * @param int $timeout The timeout in seconds to monitor for, if -1 is provided then the monitor will run + * for one iteration and then return. If 0 is provided then the monitor will run forever. * @return void */ public static function monitor(int $timeout=0): void @@ -271,11 +321,12 @@ * Spawns a worker process by their count, if the path is null then a generic sub process will be spawned * that will only be capable of executing closures. * - * @param int $count - * @param string|null $path - * @param int $channel + * @param int $count The number of workers to spawn (defaults to 8) + * @param string|null $path The path to the worker file to spawn, if null is provided then a generic worker + * will be spawned that can only execute closures (unimplemented) + * @param int $channel The channel to spawn the workers on (defaults to 0) + * @throws WorkerFailedException If the worker fails to spawn * @return void - * @throws WorkerFailedException */ public static function createWorker(int $count=8, ?string $path=null, int $channel=0): void { @@ -299,12 +350,12 @@ /** * Preforms a function call against a worker in the background, returns the Job ID to keep track of the job status. * - * @param string $function - * @param array $arguments - * @param int $channel - * @return int + * @param string $function The function to call + * @param array $arguments The arguments to pass to the function + * @param int $channel The channel to preform the function call on (defaults to 0) + * @return int The Job ID of the function call */ - public static function do(string $function, array $arguments, int $channel=0): int + public static function do(string $function, array $arguments=[], int $channel=0): int { if(self::$mode !== TamerMode::CLIENT) { @@ -321,9 +372,13 @@ { self::$job_manager->pushJob($job_packet); } - catcH(Exception $e) + catch(ConnectionException $e) { - throw new RuntimeException('do() failed, failed to push job to the server', 0, $e); + throw new RuntimeException('do() failed, failed to connect to the server', 0, $e); + } + catch(JobManagerException $e) + { + throw new RuntimeException('do() failed, failed to push the job to the server', 0, $e); } self::addToWatchlist($job_packet->getId()); @@ -334,13 +389,12 @@ * Sends a function call to a worker in the background, but once the job is completed it will be forgotten and * the result will not be returned, this also means that the job will not be added to the watchlist. * - * @param string $function - * @param array $arguments - * @param int $channel + * @param string $function The function to call + * @param array $arguments The arguments to pass to the function + * @param int $channel The channel to preform the function call on (defaults to 0) * @return void - * @noinspection PhpUnused */ - public static function dof(string $function, array $arguments, int $channel=0): void + public static function dof(string $function, array $arguments=[], int $channel=0): void { if(self::$mode !== TamerMode::CLIENT) { @@ -360,20 +414,22 @@ { throw new RuntimeException('dof() failed, failed to push job to the server', 0, $e); } - - self::addToWatchlist($job_packet->getId()); } /** * Waits for all the dispatched jobs to complete, this is a blocking function and will not return until all the * jobs have completed. If a timeout is specified, the function will return after the timeout has been reached. * - * @param callable $callback - * @param int $timeout + * @param callable $callback A callback function that will be called after each iteration of the wait loop + * @param int $timeout The timeout in seconds, if 0 is provided then the function will block until all the jobs + * have completed, if -1 is provided then the function run for one iteration and return * @return void - * @throws ServerException - * @throws Throwable - * @throws TimeoutException + * @throws JobManagerException If the JobManager throws an exception + * @throws TamerException If the Tamer throws an exception + * @throws TimeoutException If the timeout is reached + * @throws ConnectionException If the client fails to connect to the server + * @throws Exception If a job fails + * @throws Throwable If a job fails */ public static function wait(callable $callback, int $timeout=0): void { @@ -388,68 +444,57 @@ self::monitor(-1); if(count(self::$watching_jobs) === 0) { - Log::debug(Utilities::getName(), 'No jobs to wait for, returning'); return; } Log::debug(Utilities::getName(), 'Waiting for jobs to complete'); $job_packet = self::$job_manager->listenReturnChannel(self::$return_channel); - if(in_array($job_packet->getId(), self::$watching_jobs)) + if(!in_array($job_packet->getId(), self::$watching_jobs)) { - self::removeFromWatchlist($job_packet->getId()); + Log::debug(Utilities::getName(), sprintf('Job \'%s\' has returned, but is not in the watchlist', $job_packet->getId())); self::$job_manager->dropJob($job_packet->getId()); + continue; + } - if($job_packet->getStatus() === JobStatus::FINISHED) + self::removeFromWatchlist($job_packet->getId()); + self::$job_manager->dropJob($job_packet->getId()); + + if($job_packet->getStatus() === JobStatus::FINISHED) + { + $return_value = $job_packet->getReturnValue(); + + if($return_value !== null) { - $return_value = $job_packet->getReturnValue(); - - if($return_value === null) - { - $return_value = null; - } - else - { - $return_value = unserialize($return_value, ['allowed_classes' => true]); - - if($return_value === false) - { - Log::error(Utilities::getName(), 'Failed to unserialize return value, return value was dropped'); - $return_value = null; - } - } - - $callback($job_packet->getId(), $return_value); + $return_value = unserialize($return_value, ['allowed_classes' => true]); } - elseif($job_packet->getStatus() === JobStatus::FAILED) - { - try - { - $e = unserialize($job_packet->getException(), ['allowed_classes' => true]); - } - catch(Exception $e) - { - Log::error(Utilities::getName(), 'Failed to unserialize exception, exception was dropped', $e); - } - finally - { - if(isset($e) && $e instanceof Throwable) - { - throw $e; - } - throw new ServerException('wait() failed, job returned with an exception'); - } - } - else + $callback($job_packet->getId(), $return_value); + } + elseif($job_packet->getStatus() === JobStatus::FAILED) + { + try { - Log::debug(Utilities::getName(), sprintf('Job \'%s\' returned with an unexpected status of \'%s\'', $job_packet->getId(), $job_packet->getStatus())); - throw new ServerException('wait() failed, job returned with an unexpected status of \'' . $job_packet->getStatus() . '\''); + $e = unserialize($job_packet->getException(), ['allowed_classes' => true]); + } + catch(Exception $e) + { + Log::error(Utilities::getName(), 'Failed to unserialize exception, exception was dropped', $e); + } + finally + { + if(isset($e) && $e instanceof Throwable) + { + throw $e; + } + + /** @noinspection ThrowRawExceptionInspection */ + throw new Exception(sprintf('wait() failed, job \'%s\' failed with an unknown exception', $job_packet->getId())); } } else { - Log::debug(Utilities::getName(), sprintf('Job \'%s\' has returned, but is not in the watchlist', $job_packet->getId())); + throw new TamerException(sprintf('wait() failed, job \'%s\' returned with an unknown status \'%s\'', $job_packet->getId(), $job_packet->getStatus())); } if ($timeout < 0) @@ -462,38 +507,36 @@ throw new TimeoutException('wait() timed out'); } - usleep(10); + usleep(1000); } } /** * Waits for a job to complete, returns the result of the job. * - * @param JobPacket|int $job_id - * @param int $timeout - * @return mixed - * @throws JobNotFoundException - * @throws ServerException - * @throws TimeoutException - * @throws Throwable + * @param int $job_id The ID of the job to wait for + * @param int $timeout The timeout in seconds, if 0 is provided then the function will block until all the jobs + * have completed, if -1 is provided then the function run for one iteration and return + * @throws ConnectionException If the client fails to connect to the server + * @throws JobManagerException If the JobManager throws an exception + * @throws JobNotFoundException If the job is not found + * @throws TimeoutException If the timeout is reached + * @throws Throwable If the job fails + * @throws Exception If the job fails + * @return mixed The return value of the job */ - public static function waitFor(JobPacket|int $job_id, int $timeout=0): mixed + public static function waitFor(int $job_id, int $timeout=0): mixed { if(self::$mode !== TamerMode::CLIENT) { throw new RuntimeException(sprintf('Attempting to waitFor() in \'%s\' mode, only clients can preform waitFor().', self::$mode)); } - if($job_id instanceof JobPacket) - { - $job_id = $job_id->getId(); - } - $time_start = time(); - while(true) { self::monitor(-1); + switch(self::$job_manager->getJobStatus($job_id)) { case JobStatus::FINISHED: @@ -524,18 +567,18 @@ /** * Preforms a do() call on a waitFor() call all in one function. * - * @param string $function - * @param array $arguments - * @param int $channel - * @param int $timeout - * @return mixed - * @throws JobNotFoundException - * @throws ServerException - * @throws Throwable - * @throws TimeoutException - * @noinspection PhpUnused + * @param string $function The function to call + * @param array $arguments The arguments to pass to the function + * @param int $channel The channel to use + * @param int $timeout The timeout in seconds, if 0 is provided then the function will block until all the jobs + * @throws ConnectionException If the client fails to connect to the server + * @throws JobManagerException If the JobManager throws an exception + * @throws JobNotFoundException If the job is not found + * @throws TimeoutException If the timeout is reached + * @throws Throwable If the job fails + * @return mixed The return value of the job */ - public static function doWait(string $function, array $arguments, int $channel=0, int $timeout=0): mixed + public static function doWait(string $function, array $arguments=[], int $channel=0, int $timeout=0): mixed { return self::waitFor(self::do($function, $arguments, $channel), $timeout); } @@ -545,7 +588,6 @@ * Clears the watchlist, this will remove all jobs from the watchlist. * * @return void - * @noinspection PhpUnused */ public static function clear(): void { @@ -564,7 +606,7 @@ * @param array $arguments * @return int */ - public static function __callStatic(string $name, array $arguments) + public static function __callStatic(string $name, array $arguments=[]) { return self::do($name, $arguments); } @@ -574,8 +616,12 @@ */ /** - * @param string $function - * @param callable $callback + * Registers a new function to be called by the worker, this function will be called when the worker receives + * a job with the same name as the function. Internally this function uses call_user_func_array() to call the + * function. + * + * @param string $function The name of the function to register + * @param callable $callback The callback to call when the function is called * @return void */ public static function addFunction(string $function, callable $callback): void @@ -599,9 +645,11 @@ } /** - * @param string $function + * Removes a function from the worker, this function will no longer be called when a job with the same name + * is received. + * + * @param string $function The name of the function to remove * @return void - * @noinspection PhpUnused */ public static function removeFunction(string $function): void { @@ -610,12 +658,18 @@ throw new RuntimeException(sprintf('Attempting to removeFunction() in \'%s\' mode, only workers can preform removeFunction().', self::$mode)); } + if(!isset(self::$function_pointers[$function])) + { + throw new InvalidArgumentException(sprintf('Attempting to removeFunction() with a function name of \'%s\', this function does not exist.', $function)); + } + unset(self::$function_pointers[$function]); } /** - * @return array - * @noinspection PhpUnused + * Returns an array of all the registered functions. + * + * @return array An array of all the registered functions */ public static function getFunctions(): array { @@ -628,15 +682,17 @@ } /** - * @param int|array $channel - * @param int $timeout + * Runs the worker, this function will block and listen for incoming jobs, if a job is received then the + * job will be processed and optionally returned back to the client via a return channel. + * + * @param int|array $channel The channel to listen on, if an array is provided then the worker will listen on + * @param int $timeout The timeout in seconds, if 0 is provided then the function will block until a job is received + * @param bool $ignore_errors If set to true then the worker will not throw exceptions, instead it will log the + * error and continue to run. + * @throws ConnectionException If the client fails to connect to the server * @return void - * @throws ConnectionException - * @throws JobManagerException - * @throws JobNotFoundException - * @throws ServerException */ - public static function run(int|array $channel=0, int $timeout=0): void + public static function run(int|array $channel=0, int $timeout=0, bool $ignore_errors=false): void { if(self::$mode !== TamerMode::WORKER) { @@ -651,7 +707,7 @@ try { $job_packet = self::$job_manager->listenForJob(self::$worker_configuration->getWorkerId(), $channel, $timeout); - break; + self::executeJob($job_packet); } catch(TimeoutException $e) { @@ -661,8 +717,7 @@ /** @noinspection PhpRedundantCatchClauseInspection */ catch(RedisException $e) { - // TODO: There has to be a better way to do this. - if(strtolower($e->getMessage()) === 'redis server went away') + if($ignore_errors === false && strtolower($e->getMessage()) === 'redis server went away') { if($error_time === null) { @@ -670,35 +725,25 @@ } else if((time() - $error_time) >= 5) { - throw new ServerException('Redis server went away, and did not come back.'); + throw new ConnectionException('Redis server went away, and did not come back.'); } } } + catch(Exception $e) + { + Log::error(Utilities::getName(), sprintf('Worker %s encountered an error while listening for jobs: %s', self::$worker_configuration->getWorkerId(), $e->getMessage()), $e); + unset($e); + } + + if($timeout === -1) + { + return; + } if((time() - $start_time) >= $timeout) { return; } } - - - Log::debug(Utilities::getName(), sprintf('Worker %s received job %s', self::$worker_configuration->getWorkerId(), $job_packet->getId())); - - if(!isset(self::$function_pointers[$job_packet->getPayload()])) - { - - Log::warning(Utilities::getName(), sprintf('Job %s requested function \'%s\' which does not exist, rejecting job.', $job_packet->getId(), $job_packet->getPayload())); - self::$job_manager->rejectJob($job_packet); - } - - try - { - $result = call_user_func_array(self::$function_pointers[$job_packet->getPayload()], unserialize($job_packet->getParameters(), ['allowed_classes'=>true])); - self::$job_manager->returnJob($job_packet, $result); - } - catch(Exception $e) - { - self::$job_manager->returnException($job_packet, $e); - } } } \ No newline at end of file diff --git a/tests/ExampleClass.php b/tests/ExampleClass.php index 8efbf75..7d6560b 100644 --- a/tests/ExampleClass.php +++ b/tests/ExampleClass.php @@ -3,57 +3,44 @@ class ExampleClass { /** - * @var array + * Sleeps for the given number of seconds, plus a random number of seconds between 0 and 100. + * + * @param int $seconds + * @return int + * @throws Exception */ - private $data; - - /** - * ExampleClass constructor. - */ - public function __construct() + public function sleep(int $seconds=1): int { - $this->data = []; + sleep($seconds); + return random_int(0, 100) + $seconds; } /** - * Sets a value in the data array + * Calculates pi using the Leibniz formula. + * + * @param int $iterations + * @return float + */ + public function pi(int $iterations): float + { + $pi = 0; + $sign = 1; + for ($i = 0; $i < $iterations; $i++) + { + $pi += $sign / (2 * $i + 1); + $sign *= -1; + } + return $pi * 4; + } + + /** + * Throws an exception. * - * @param string $key - * @param mixed $value * @return void + * @throws Exception */ - public function set(string $key, mixed $value): void + public function throwException(): void { - $this->data[$key] = $value; - } - - /** - * Gets a value from the data array - * - * @param string $key - * @return mixed - */ - public function get(string $key): mixed - { - return $this->data[$key]; - } - - /** - * Checks if a key exists in the data array - * - * @param string $key - * @return bool - */ - public function exists(string $key): bool - { - return isset($this->data[$key]); - } - - /** - * @return void - */ - public function clear(): void - { - $this->data = []; + throw new Exception('This is an exception.'); } } \ No newline at end of file diff --git a/tests/exception_test.php b/tests/exception_test.php new file mode 100644 index 0000000..18bce6d --- /dev/null +++ b/tests/exception_test.php @@ -0,0 +1,14 @@ +getMessage() . PHP_EOL); - } - } + // Run the worker + \TamerLib\tm::run(); \ No newline at end of file