From 9edc68ca1a2f1bf9d965309318a8f257da02e637 Mon Sep 17 00:00:00 2001 From: Netkas Date: Fri, 7 Jul 2023 00:22:35 -0400 Subject: [PATCH] Many changes, see CHANGELOG.md --- CHANGELOG.md | 27 +++++++- src/TamerLib/Classes/JobManager.php | 97 +++++++++++++++++++++++++--- src/TamerLib/Classes/RedisServer.php | 20 +++--- src/TamerLib/Classes/Utilities.php | 45 +++++++++++++ src/TamerLib/Classes/redis.conf | 14 ++++ src/TamerLib/tm.php | 94 +++++++++++++++++++++------ 6 files changed, 255 insertions(+), 42 deletions(-) create mode 100644 src/TamerLib/Classes/redis.conf diff --git a/CHANGELOG.md b/CHANGELOG.md index 73113c8..fabb5c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,10 +5,35 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [2.0.2] - Unreleased +## [2.1.0] - Unreleased + +### Added + - Implemented `ausleep()` in for `TamerLib\Classes\ > JobManager` to allow TamerLib to monitor sub-processes and + workers without blocking the main thread. + - Updated some methods in `TamerLib\Classes\ > JobManager` to use properties from `JobPacket` instead of calling + the server for the same information if the passed parameter (usually $job_id) is an instance of `JobPacket`, + this reduces the number of calls to the server and improves performance. + - Updated some methods in `TamerLib > tm` to use `asleep` instead of `sleep` to allow TamerLib to monitor sub-processes + and workers without blocking the main thread. + - Added a custom redis configuration file to improve performance when running TamerLib on a single machine. ### Changed - `\TamerLib\Classes\ > RedisServer > start()` now starts the server with a matching logging level to `net.nosial.loglib` + - Changed method `public static function do(string $function, array $arguments=[], int $channel=0): int` + to `public static function do(string $function, array $arguments=[], ?callable $callback=null, array $options=[]): int` in `\TamerLib > tm > do()` + so that the `$callback` parameter is now optional and will be used to handle the result of the job when `tm::wait()` + is called, additionally some options can be passed to the job such as `channel` for passing on the channel to the + function as previously done with the `$channel` parameter. + - Changed method `public static function dof(string $function, array $arguments=[], int $channel=0): void` to + `public static function dof(string $function, array $arguments=[], array $options=[]): void` in `\TamerLib > tm > dof()` + to represent the changes made to `tm::do()` as described above. + - Changed method `public static function wait(?callable $callback=null, int $timeout=0): void` to + `public static function wait(int $timeout=0): void` as the function itself will now handle the result of the job + using the callback passed to `tm::do()` or `tm::dof()` when the job is done. + +### Fixed + - Fixed synchronization issue in TamerLib where callbacks would run indefinitely if further jobs were added to the + queue while the callback was running. ## [2.0.1] - 2023-06-30 diff --git a/src/TamerLib/Classes/JobManager.php b/src/TamerLib/Classes/JobManager.php index af08d46..a7b350c 100644 --- a/src/TamerLib/Classes/JobManager.php +++ b/src/TamerLib/Classes/JobManager.php @@ -16,6 +16,7 @@ use TamerLib\Objects\JobPacket; use TamerLib\Objects\ServerConfiguration; use TamerLib\Objects\WorkerConfiguration; + use TamerLib\tm; use Throwable; class JobManager @@ -188,6 +189,7 @@ { try { + Log::debug(Utilities::getName(), sprintf('JobManager pushing job %s to %s:%s', $jobPacket->getId(), $this->server_configuration->getHost(), $this->server_configuration->getPort())); $this->getClient()->hMSet($jobPacket->getId(), $jobPacket->toArray()); $this->getClient()->rPush(sprintf('ch%s', $jobPacket->getChannel()), $jobPacket->getId()); } @@ -219,6 +221,8 @@ $job_id = $job_id->getId(); } + Log::debug(Utilities::getName(), sprintf('JobManager getting job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + try { if(!$this->getClient()->exists($job_id)) @@ -278,6 +282,7 @@ if($wait_for !== null) { $start_time = time(); + Log::debug(Utilities::getName(), sprintf('JobManager waiting for job %s to be one of the following statuses: %s', $job_id, implode(', ', $wait_for))); while(true) { $job = $this->getJob($job_id); @@ -292,10 +297,14 @@ throw new TimeoutException(sprintf('Timed out waiting for job %s to be one of the following statuses: %s', $job_id, implode(', ', $wait_for))); } - usleep(100000); + Utilities::ausleep(100000, static function(){ + tm::monitor(-1); + }); } } + Log::debug(Utilities::getName(), sprintf('JobManager getting status of job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + try { if(!$this->getClient()->exists($job_id)) @@ -336,13 +345,38 @@ { if($timeout < 0) { + Log::debug(Utilities::getName(), sprintf('Listening on return channel (LPOP) %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort())); $job_packet = $this->getClient()->lPop($return_channel); } else { - $job_packet = $this->getClient()->blPop($return_channel, $timeout); + Log::debug(Utilities::getName(), sprintf('Listening on return channel %s on %s:%s with a timeout of %s seconds', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $timeout)); + + $start_time = time(); + while(true) + { + $job_packet = $this->getClient()->lPop($return_channel); + + if(!is_bool($job_packet) && $job_packet !== null) + { + break; + } + + if($timeout > 0 && (time() - $start_time) > $timeout) + { + throw new TimeoutException(sprintf('Timed out listening on return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + } + + Utilities::ausleep(10000, static function(){ + tm::monitor(-1); + }); + } } } + catch(TimeoutException $e) + { + throw $e; + } catch(RedisException $e) { throw new ConnectionException(sprintf('Client threw an error while trying to get job from return channel %s on %s:%s, %s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); @@ -352,12 +386,13 @@ throw new JobManagerException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e); } - if($job_packet === null) + if(is_bool($job_packet) && $job_packet === false) { throw new TimeoutException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort())); } - return $job_packet[1]; + Log::debug(Utilities::getName(), sprintf('Got job %s from return channel %s on %s:%s', $job_packet, $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + return $job_packet; } /** @@ -378,6 +413,8 @@ $job_id = $job_id->getId(); } + Log::debug(Utilities::getName(), sprintf('Pushing job %s back to return channel %s on %s:%s', $job_id, $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + try { if(!$this->getClient()->exists($job_id)) @@ -415,6 +452,8 @@ $job_id = $job_id->getId(); } + Log::debug(Utilities::getName(), sprintf('Getting return value of job %s on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + try { if(!$this->getClient()->exists($job_id)) @@ -457,6 +496,8 @@ $job_id = $job_id->getId(); } + Log::debug(Utilities::getName(), sprintf('Getting exception of job %s on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + try { if(!$this->getClient()->exists($job_id)) @@ -519,6 +560,8 @@ $worker_id = $worker_id->getWorkerId(); } + Log::debug(Utilities::getName(), sprintf('Attempting to claim job %s on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + try { // Check if the job exists @@ -533,6 +576,7 @@ // Verify that the job was claimed if($this->getClient()->hGet($job_id, 'worker_id') !== $worker_id) { + Log::warning(Utilities::getName(), sprintf('Job %s on %s:%s was already claimed by %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $this->getClient()->hGet($job_id, 'worker_id'))); return false; } @@ -624,27 +668,40 @@ */ public function returnJob(JobPacket|string $job_id, mixed $return_value=null): void { + $was_job_packet = false; + if($job_id instanceof JobPacket) { + $return_channel = $job_id->getReturnChannel(); /** @noinspection CallableParameterUseCaseInTypeContextInspection */ $job_id = $job_id->getId(); + $was_job_packet = true; } + Log::debug(Utilities::getName(), sprintf('Returning job %s', $job_id)); + try { if(!$this->getClient()->exists($job_id)) { + throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); } - $return_channel = $this->getClient()->hGet($job_id, 'return_channel'); + if(!$was_job_packet) + { + $return_channel = $this->getClient()->hGet($job_id, 'return_channel'); + } + + /** @noinspection PhpUndefinedVariableInspection */ if($return_channel === null) { + Log::debug(Utilities::getName(), sprintf('No return channel set, deleting job %s', $job_id)); $this->getClient()->del($job_id); return; } - Log::debug(Utilities::getName(), sprintf('Returning job %s (Return Channel: %s)', $job_id, $return_channel)); + Log::debug(Utilities::getName(), sprintf('Pushing job %s to return channel %s', $job_id, $return_channel)); $this->getClient()->hSet($job_id, 'return_value', serialize($return_value)); $this->getClient()->hSet($job_id, 'status', JobStatus::FINISHED); $this->getClient()->rPush($this->getClient()->hGet($job_id, 'return_channel'), $job_id); @@ -674,12 +731,17 @@ */ public function returnException(JobPacket|string $job_id, Throwable $throwable): void { + $was_job_packet = false; if($job_id instanceof JobPacket) { + $return_channel = $job_id->getReturnChannel(); /** @noinspection CallableParameterUseCaseInTypeContextInspection */ $job_id = $job_id->getId(); + $was_job_packet = true; } + Log::debug(Utilities::getName(), sprintf('Returning exception for job %s', $job_id)); + try { if(!$this->getClient()->exists($job_id)) @@ -687,17 +749,23 @@ throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); } - if($this->getClient()->hGet($job_id, 'return_channel') === null) + if(!$was_job_packet) { + $return_channel = $this->getClient()->hGet($job_id, 'return_channel'); + } + + /** @noinspection PhpUndefinedVariableInspection */ + if($return_channel === null) + { + Log::debug(Utilities::getName(), sprintf('No return channel set, deleting job %s', $job_id)); $this->getClient()->del($job_id); return; } - Log::debug(Utilities::getName(), sprintf('Returning exception for job %s', $job_id)); + Log::debug(Utilities::getName(), sprintf('Pushing job %s to return channel %s', $job_id, $return_channel)); $this->getClient()->hSet($job_id, 'exception', serialize($throwable)); $this->getClient()->hSet($job_id, 'status', JobStatus::FAILED); - $this->getClient()->rPush($this->getClient()->hGet($job_id, 'return_channel'), $job_id); - Log::debug(Utilities::getName(), sprintf('Pushed job %s to return channel %s', $job_id, $this->getClient()->hGet($job_id, 'return_channel'))); + $this->getClient()->rPush($return_channel, $job_id); } catch(RedisException $e) { @@ -742,6 +810,8 @@ $channels[] = sprintf('ch%s', $channel); } + Log::debug(Utilities::getName(), sprintf('Waiting for job on channels %s', implode(', ', $channels))); + try { while(true) @@ -761,6 +831,11 @@ return new JobPacket($this->getClient()->hGetAll($job_id[1])); } + if($job_id !== false) + { + Log::debug(Utilities::getName(), sprintf('Could not claim job %s', $job_id[1])); + } + if($timeout < 0) { throw new TimeoutException(sprintf('Timeout exceeded while waiting for job on %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort())); @@ -803,6 +878,8 @@ $job_id = $job_id->getId(); } + Log::debug(Utilities::getName(), sprintf('Dropping job %s', $job_id)); + try { if(!$this->getClient()->exists($job_id)) diff --git a/src/TamerLib/Classes/RedisServer.php b/src/TamerLib/Classes/RedisServer.php index 20dac95..803f3f6 100644 --- a/src/TamerLib/Classes/RedisServer.php +++ b/src/TamerLib/Classes/RedisServer.php @@ -44,7 +44,6 @@ $this->cmd = $cmd; } - /** * Determines if the Redis server is running. * @@ -76,17 +75,18 @@ Log::verbose(Utilities::getName(), 'Starting server on port ' . $this->configuration->getPort() . '.'); - $log_level = match (\LogLib\Classes\Utilities::getLogLevel()) - { - LevelType::Warning, LevelType::Error => 'warning', - LevelType::Verbose => 'verbose', - LevelType::Debug => 'debug', - default => 'notice', - }; - $this->server_process = new Process([ - $this->cmd, '--port', $this->configuration->getPort(), '--loglevel', $log_level + $this->cmd, __DIR__ . DIRECTORY_SEPARATOR . 'redis.conf', + '--port', $this->configuration->getPort(), + '--loglevel', match (\LogLib\Classes\Utilities::getLogLevel()) + { + LevelType::Warning, LevelType::Error => 'warning', + LevelType::Verbose => 'verbose', + LevelType::Debug => 'debug', + default => 'notice', + } ]); + $this->server_process->start(); // Use a redis client and ping the server until it responds. diff --git a/src/TamerLib/Classes/Utilities.php b/src/TamerLib/Classes/Utilities.php index 659594f..50ed388 100644 --- a/src/TamerLib/Classes/Utilities.php +++ b/src/TamerLib/Classes/Utilities.php @@ -136,4 +136,49 @@ return empty($output) ? (string)null : $output; } + + /** + * Sleeps for a given number of seconds. If a callback is provided, it will be called + * every second until the sleep is complete. + * + * @param int $seconds + * @param callable|null $callback + * @return void + */ + public static function asleep(int $seconds, ?callable $callback=null): void + { + $start = time(); + while(time() - $start < $seconds) + { + if($callback !== null) + { + $callback(); + } + sleep(1); + } + } + + /** + * Sleeps for a given number of microseconds. If a callback is provided, it will be called + * every millisecond until the sleep is complete. + * + * @param int $microseconds + * @param callable|null $callback + * @return void + */ + public static function ausleep(int $microseconds, ?callable $callback = null): void + { + $start = microtime(true); + $elapsed = 0; + + while ($elapsed < $microseconds) + { + if ($callback !== null) { + $callback(); + } + + usleep(1000); + $elapsed = (microtime(true) - $start) * 1000000; + } + } } \ No newline at end of file diff --git a/src/TamerLib/Classes/redis.conf b/src/TamerLib/Classes/redis.conf new file mode 100644 index 0000000..85ee7a8 --- /dev/null +++ b/src/TamerLib/Classes/redis.conf @@ -0,0 +1,14 @@ +# Redis configuration file TamerLib local mode + +# General +bind 127.0.0.1 + +# Memory and Storage +maxmemory 1GB +maxmemory-policy volatile-lru +maxmemory-samples 5 + +# Performance Tuning +tcp-backlog 511 +tcp-keepalive 10 +timeout 0 \ No newline at end of file diff --git a/src/TamerLib/tm.php b/src/TamerLib/tm.php index b311fd8..cc2ac30 100644 --- a/src/TamerLib/tm.php +++ b/src/TamerLib/tm.php @@ -50,6 +50,11 @@ */ private static $watching_jobs = []; + /** + * @var array + */ + private static $job_callbacks = []; + /** * @var WorkerSupervisor|null */ @@ -83,13 +88,18 @@ * Appends the job ID to the watch list * * @param int $job_id The job ID to add to the watch list + * @param callable|null $callback * @return void */ - private static function addToWatchlist(int $job_id): void + private static function addToWatchlist(int $job_id, ?callable $callback=null): void { if(!in_array($job_id, self::$watching_jobs, true)) { self::$watching_jobs[] = $job_id; + if($callback !== null) + { + self::$job_callbacks[$job_id] = $callback; + } } } @@ -105,6 +115,11 @@ { unset(self::$watching_jobs[$key]); } + + if(isset(self::$job_callbacks[$job_id])) + { + unset(self::$job_callbacks[$job_id]); + } } /** @@ -352,16 +367,23 @@ * * @param string $function The function to call * @param array $arguments The arguments to pass to the function - * @param int $channel The channel to preform the function call on (defaults to 0) + * @param callable|null $callback + * @param array $options * @return int The Job ID of the function call */ - public static function do(string $function, array $arguments=[], int $channel=0): int + public static function do(string $function, array $arguments=[], ?callable $callback=null, array $options=[]): int { if(self::$mode !== TamerMode::CLIENT) { throw new RuntimeException(sprintf('Attempting to do() in \'%s\' mode, only clients can preform do().', self::$mode)); } + $channel = 0; + if(isset($options['channel']) && is_int($options['channel'])) + { + $channel = $options['channel']; + } + $job_packet = new JobPacket(); $job_packet->setParameters(serialize($arguments)); $job_packet->setPayload($function); @@ -381,7 +403,7 @@ throw new RuntimeException('do() failed, failed to push the job to the server', 0, $e); } - self::addToWatchlist($job_packet->getId()); + self::addToWatchlist($job_packet->getId(), $callback); return $job_packet->getId(); } @@ -391,16 +413,23 @@ * * @param string $function The function to call * @param array $arguments The arguments to pass to the function - * @param int $channel The channel to preform the function call on (defaults to 0) + * @param array $options * @return void */ - public static function dof(string $function, array $arguments=[], int $channel=0): void + public static function dof(string $function, array $arguments=[], array $options=[]): void { if(self::$mode !== TamerMode::CLIENT) { throw new RuntimeException(sprintf('Attempting to dof() in \'%s\' mode, only clients can preform dof().', self::$mode)); } + $channel = 0; + + if(isset($options['channel']) && is_int($options['channel'])) + { + $channel = $options['channel']; + } + $job_packet = new JobPacket(); $job_packet->setParameters(serialize($arguments)); $job_packet->setPayload($function); @@ -420,9 +449,8 @@ * Waits for all the dispatched jobs to complete, this is a blocking function and will not return until all the * jobs have completed. If a timeout is specified, the function will return after the timeout has been reached. * - * @param callable|null $callback A callback function that will be called after each iteration of the wait loop - * @param int $timeout The timeout in seconds, if 0 is provided then the function will block until all the jobs - * have completed, if -1 is provided then the function run for one iteration and return + * @param int $timeout The timeout in seconds, if 0 is provided, then the function will block until all the jobs + * have completed, if -1 is provided then the function runs for one iteration and returns * @return void * @throws ConnectionException If the client fails to connect to the server * @throws JobManagerException If the JobManager throws an exception @@ -430,7 +458,7 @@ * @throws Throwable If a job fails * @throws TimeoutException If the timeout is reached */ - public static function wait(?callable $callback=null, int $timeout=0): void + public static function wait(int $timeout=0): void { if(self::$mode !== TamerMode::CLIENT) { @@ -438,35 +466,59 @@ } $time_start = time(); + if(count(self::$watching_jobs) === 0) + { + return; + } + + $watching_jobs = self::$watching_jobs; + Log::verbose(Utilities::getName(), sprintf('Waiting for %s job(s) to complete', count($watching_jobs))); + while(true) { - self::monitor(-1); - if(count(self::$watching_jobs) === 0) + if(count($watching_jobs) === 0) { return; } - Log::debug(Utilities::getName(), 'Waiting for jobs to complete'); - $job_packet = self::$job_manager->listenReturnChannel(self::$return_channel); + self::monitor(-1); + $job_id = self::$job_manager->listenReturnChannel(self::$return_channel); - if(!in_array($job_packet->getId(), self::$watching_jobs)) + if(!in_array($job_id, $watching_jobs, false)) { - Log::debug(Utilities::getName(), sprintf('Job \'%s\' has returned, but is not in the watchlist', $job_packet->getId())); - self::$job_manager->dropJob($job_packet->getId()); + Log::debug(Utilities::getName(), sprintf('Job \'%s\' has returned, but is not in the watchlist', $job_id)); + self::$job_manager->pushbackJob($job_id, self::$return_channel); continue; } - self::removeFromWatchlist($job_packet->getId()); - self::$job_manager->dropJob($job_packet->getId()); + $job_packet = self::$job_manager->getJob($job_id); - if($callback !== null && $job_packet->getStatus() === JobStatus::FINISHED) + if(isset(self::$job_callbacks[$job_id])) + { + $callback = self::$job_callbacks[$job_id]; + unset(self::$job_callbacks[$job_id]); + } + else + { + $callback = null; + } + + self::$job_manager->dropJob($job_packet->getId()); + self::removeFromWatchlist($job_packet->getId()); + unset($watching_jobs[array_search($job_packet->getId(), $watching_jobs, false)]); + + if($job_packet->getStatus() === JobStatus::FINISHED) { $return_value = $job_packet->getReturnValue(); if($return_value !== null) { $return_value = unserialize($return_value, ['allowed_classes' => true]); } - $callback($job_packet->getId(), $return_value); + + if($callback !== null) + { + $callback($return_value); + } } elseif($job_packet->getStatus() === JobStatus::FAILED) {