Many changes, see CHANGELOG.md

This commit is contained in:
Netkas 2023-07-07 00:22:35 -04:00
parent 4ff2bd403f
commit 9edc68ca1a
No known key found for this signature in database
GPG key ID: 5DAF58535614062B
6 changed files with 255 additions and 42 deletions

View file

@ -5,10 +5,35 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [2.0.2] - Unreleased
## [2.1.0] - Unreleased
### Added
- Implemented `ausleep()` in for `TamerLib\Classes\ > JobManager` to allow TamerLib to monitor sub-processes and
workers without blocking the main thread.
- Updated some methods in `TamerLib\Classes\ > JobManager` to use properties from `JobPacket` instead of calling
the server for the same information if the passed parameter (usually $job_id) is an instance of `JobPacket`,
this reduces the number of calls to the server and improves performance.
- Updated some methods in `TamerLib > tm` to use `asleep` instead of `sleep` to allow TamerLib to monitor sub-processes
and workers without blocking the main thread.
- Added a custom redis configuration file to improve performance when running TamerLib on a single machine.
### Changed
- `\TamerLib\Classes\ > RedisServer > start()` now starts the server with a matching logging level to `net.nosial.loglib`
- Changed method `public static function do(string $function, array $arguments=[], int $channel=0): int`
to `public static function do(string $function, array $arguments=[], ?callable $callback=null, array $options=[]): int` in `\TamerLib > tm > do()`
so that the `$callback` parameter is now optional and will be used to handle the result of the job when `tm::wait()`
is called, additionally some options can be passed to the job such as `channel` for passing on the channel to the
function as previously done with the `$channel` parameter.
- Changed method `public static function dof(string $function, array $arguments=[], int $channel=0): void` to
`public static function dof(string $function, array $arguments=[], array $options=[]): void` in `\TamerLib > tm > dof()`
to represent the changes made to `tm::do()` as described above.
- Changed method `public static function wait(?callable $callback=null, int $timeout=0): void` to
`public static function wait(int $timeout=0): void` as the function itself will now handle the result of the job
using the callback passed to `tm::do()` or `tm::dof()` when the job is done.
### Fixed
- Fixed synchronization issue in TamerLib where callbacks would run indefinitely if further jobs were added to the
queue while the callback was running.
## [2.0.1] - 2023-06-30

View file

@ -16,6 +16,7 @@
use TamerLib\Objects\JobPacket;
use TamerLib\Objects\ServerConfiguration;
use TamerLib\Objects\WorkerConfiguration;
use TamerLib\tm;
use Throwable;
class JobManager
@ -188,6 +189,7 @@
{
try
{
Log::debug(Utilities::getName(), sprintf('JobManager pushing job %s to %s:%s', $jobPacket->getId(), $this->server_configuration->getHost(), $this->server_configuration->getPort()));
$this->getClient()->hMSet($jobPacket->getId(), $jobPacket->toArray());
$this->getClient()->rPush(sprintf('ch%s', $jobPacket->getChannel()), $jobPacket->getId());
}
@ -219,6 +221,8 @@
$job_id = $job_id->getId();
}
Log::debug(Utilities::getName(), sprintf('JobManager getting job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
try
{
if(!$this->getClient()->exists($job_id))
@ -278,6 +282,7 @@
if($wait_for !== null)
{
$start_time = time();
Log::debug(Utilities::getName(), sprintf('JobManager waiting for job %s to be one of the following statuses: %s', $job_id, implode(', ', $wait_for)));
while(true)
{
$job = $this->getJob($job_id);
@ -292,10 +297,14 @@
throw new TimeoutException(sprintf('Timed out waiting for job %s to be one of the following statuses: %s', $job_id, implode(', ', $wait_for)));
}
usleep(100000);
Utilities::ausleep(100000, static function(){
tm::monitor(-1);
});
}
}
Log::debug(Utilities::getName(), sprintf('JobManager getting status of job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
try
{
if(!$this->getClient()->exists($job_id))
@ -336,13 +345,38 @@
{
if($timeout < 0)
{
Log::debug(Utilities::getName(), sprintf('Listening on return channel (LPOP) %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
$job_packet = $this->getClient()->lPop($return_channel);
}
else
{
$job_packet = $this->getClient()->blPop($return_channel, $timeout);
Log::debug(Utilities::getName(), sprintf('Listening on return channel %s on %s:%s with a timeout of %s seconds', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $timeout));
$start_time = time();
while(true)
{
$job_packet = $this->getClient()->lPop($return_channel);
if(!is_bool($job_packet) && $job_packet !== null)
{
break;
}
if($timeout > 0 && (time() - $start_time) > $timeout)
{
throw new TimeoutException(sprintf('Timed out listening on return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
}
Utilities::ausleep(10000, static function(){
tm::monitor(-1);
});
}
}
}
catch(TimeoutException $e)
{
throw $e;
}
catch(RedisException $e)
{
throw new ConnectionException(sprintf('Client threw an error while trying to get job from return channel %s on %s:%s, %s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
@ -352,12 +386,13 @@
throw new JobManagerException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e);
}
if($job_packet === null)
if(is_bool($job_packet) && $job_packet === false)
{
throw new TimeoutException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
}
return $job_packet[1];
Log::debug(Utilities::getName(), sprintf('Got job %s from return channel %s on %s:%s', $job_packet, $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
return $job_packet;
}
/**
@ -378,6 +413,8 @@
$job_id = $job_id->getId();
}
Log::debug(Utilities::getName(), sprintf('Pushing job %s back to return channel %s on %s:%s', $job_id, $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
try
{
if(!$this->getClient()->exists($job_id))
@ -415,6 +452,8 @@
$job_id = $job_id->getId();
}
Log::debug(Utilities::getName(), sprintf('Getting return value of job %s on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
try
{
if(!$this->getClient()->exists($job_id))
@ -457,6 +496,8 @@
$job_id = $job_id->getId();
}
Log::debug(Utilities::getName(), sprintf('Getting exception of job %s on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
try
{
if(!$this->getClient()->exists($job_id))
@ -519,6 +560,8 @@
$worker_id = $worker_id->getWorkerId();
}
Log::debug(Utilities::getName(), sprintf('Attempting to claim job %s on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
try
{
// Check if the job exists
@ -533,6 +576,7 @@
// Verify that the job was claimed
if($this->getClient()->hGet($job_id, 'worker_id') !== $worker_id)
{
Log::warning(Utilities::getName(), sprintf('Job %s on %s:%s was already claimed by %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $this->getClient()->hGet($job_id, 'worker_id')));
return false;
}
@ -624,27 +668,40 @@
*/
public function returnJob(JobPacket|string $job_id, mixed $return_value=null): void
{
$was_job_packet = false;
if($job_id instanceof JobPacket)
{
$return_channel = $job_id->getReturnChannel();
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$job_id = $job_id->getId();
$was_job_packet = true;
}
Log::debug(Utilities::getName(), sprintf('Returning job %s', $job_id));
try
{
if(!$this->getClient()->exists($job_id))
{
throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
}
$return_channel = $this->getClient()->hGet($job_id, 'return_channel');
if(!$was_job_packet)
{
$return_channel = $this->getClient()->hGet($job_id, 'return_channel');
}
/** @noinspection PhpUndefinedVariableInspection */
if($return_channel === null)
{
Log::debug(Utilities::getName(), sprintf('No return channel set, deleting job %s', $job_id));
$this->getClient()->del($job_id);
return;
}
Log::debug(Utilities::getName(), sprintf('Returning job %s (Return Channel: %s)', $job_id, $return_channel));
Log::debug(Utilities::getName(), sprintf('Pushing job %s to return channel %s', $job_id, $return_channel));
$this->getClient()->hSet($job_id, 'return_value', serialize($return_value));
$this->getClient()->hSet($job_id, 'status', JobStatus::FINISHED);
$this->getClient()->rPush($this->getClient()->hGet($job_id, 'return_channel'), $job_id);
@ -674,12 +731,17 @@
*/
public function returnException(JobPacket|string $job_id, Throwable $throwable): void
{
$was_job_packet = false;
if($job_id instanceof JobPacket)
{
$return_channel = $job_id->getReturnChannel();
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$job_id = $job_id->getId();
$was_job_packet = true;
}
Log::debug(Utilities::getName(), sprintf('Returning exception for job %s', $job_id));
try
{
if(!$this->getClient()->exists($job_id))
@ -687,17 +749,23 @@
throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
}
if($this->getClient()->hGet($job_id, 'return_channel') === null)
if(!$was_job_packet)
{
$return_channel = $this->getClient()->hGet($job_id, 'return_channel');
}
/** @noinspection PhpUndefinedVariableInspection */
if($return_channel === null)
{
Log::debug(Utilities::getName(), sprintf('No return channel set, deleting job %s', $job_id));
$this->getClient()->del($job_id);
return;
}
Log::debug(Utilities::getName(), sprintf('Returning exception for job %s', $job_id));
Log::debug(Utilities::getName(), sprintf('Pushing job %s to return channel %s', $job_id, $return_channel));
$this->getClient()->hSet($job_id, 'exception', serialize($throwable));
$this->getClient()->hSet($job_id, 'status', JobStatus::FAILED);
$this->getClient()->rPush($this->getClient()->hGet($job_id, 'return_channel'), $job_id);
Log::debug(Utilities::getName(), sprintf('Pushed job %s to return channel %s', $job_id, $this->getClient()->hGet($job_id, 'return_channel')));
$this->getClient()->rPush($return_channel, $job_id);
}
catch(RedisException $e)
{
@ -742,6 +810,8 @@
$channels[] = sprintf('ch%s', $channel);
}
Log::debug(Utilities::getName(), sprintf('Waiting for job on channels %s', implode(', ', $channels)));
try
{
while(true)
@ -761,6 +831,11 @@
return new JobPacket($this->getClient()->hGetAll($job_id[1]));
}
if($job_id !== false)
{
Log::debug(Utilities::getName(), sprintf('Could not claim job %s', $job_id[1]));
}
if($timeout < 0)
{
throw new TimeoutException(sprintf('Timeout exceeded while waiting for job on %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()));
@ -803,6 +878,8 @@
$job_id = $job_id->getId();
}
Log::debug(Utilities::getName(), sprintf('Dropping job %s', $job_id));
try
{
if(!$this->getClient()->exists($job_id))

View file

@ -44,7 +44,6 @@
$this->cmd = $cmd;
}
/**
* Determines if the Redis server is running.
*
@ -76,17 +75,18 @@
Log::verbose(Utilities::getName(), 'Starting server on port ' . $this->configuration->getPort() . '.');
$log_level = match (\LogLib\Classes\Utilities::getLogLevel())
{
LevelType::Warning, LevelType::Error => 'warning',
LevelType::Verbose => 'verbose',
LevelType::Debug => 'debug',
default => 'notice',
};
$this->server_process = new Process([
$this->cmd, '--port', $this->configuration->getPort(), '--loglevel', $log_level
$this->cmd, __DIR__ . DIRECTORY_SEPARATOR . 'redis.conf',
'--port', $this->configuration->getPort(),
'--loglevel', match (\LogLib\Classes\Utilities::getLogLevel())
{
LevelType::Warning, LevelType::Error => 'warning',
LevelType::Verbose => 'verbose',
LevelType::Debug => 'debug',
default => 'notice',
}
]);
$this->server_process->start();
// Use a redis client and ping the server until it responds.

View file

@ -136,4 +136,49 @@
return empty($output) ? (string)null : $output;
}
/**
* Sleeps for a given number of seconds. If a callback is provided, it will be called
* every second until the sleep is complete.
*
* @param int $seconds
* @param callable|null $callback
* @return void
*/
public static function asleep(int $seconds, ?callable $callback=null): void
{
$start = time();
while(time() - $start < $seconds)
{
if($callback !== null)
{
$callback();
}
sleep(1);
}
}
/**
* Sleeps for a given number of microseconds. If a callback is provided, it will be called
* every millisecond until the sleep is complete.
*
* @param int $microseconds
* @param callable|null $callback
* @return void
*/
public static function ausleep(int $microseconds, ?callable $callback = null): void
{
$start = microtime(true);
$elapsed = 0;
while ($elapsed < $microseconds)
{
if ($callback !== null) {
$callback();
}
usleep(1000);
$elapsed = (microtime(true) - $start) * 1000000;
}
}
}

View file

@ -0,0 +1,14 @@
# Redis configuration file TamerLib local mode
# General
bind 127.0.0.1
# Memory and Storage
maxmemory 1GB
maxmemory-policy volatile-lru
maxmemory-samples 5
# Performance Tuning
tcp-backlog 511
tcp-keepalive 10
timeout 0

View file

@ -50,6 +50,11 @@
*/
private static $watching_jobs = [];
/**
* @var array
*/
private static $job_callbacks = [];
/**
* @var WorkerSupervisor|null
*/
@ -83,13 +88,18 @@
* Appends the job ID to the watch list
*
* @param int $job_id The job ID to add to the watch list
* @param callable|null $callback
* @return void
*/
private static function addToWatchlist(int $job_id): void
private static function addToWatchlist(int $job_id, ?callable $callback=null): void
{
if(!in_array($job_id, self::$watching_jobs, true))
{
self::$watching_jobs[] = $job_id;
if($callback !== null)
{
self::$job_callbacks[$job_id] = $callback;
}
}
}
@ -105,6 +115,11 @@
{
unset(self::$watching_jobs[$key]);
}
if(isset(self::$job_callbacks[$job_id]))
{
unset(self::$job_callbacks[$job_id]);
}
}
/**
@ -352,16 +367,23 @@
*
* @param string $function The function to call
* @param array $arguments The arguments to pass to the function
* @param int $channel The channel to preform the function call on (defaults to 0)
* @param callable|null $callback
* @param array $options
* @return int The Job ID of the function call
*/
public static function do(string $function, array $arguments=[], int $channel=0): int
public static function do(string $function, array $arguments=[], ?callable $callback=null, array $options=[]): int
{
if(self::$mode !== TamerMode::CLIENT)
{
throw new RuntimeException(sprintf('Attempting to do() in \'%s\' mode, only clients can preform do().', self::$mode));
}
$channel = 0;
if(isset($options['channel']) && is_int($options['channel']))
{
$channel = $options['channel'];
}
$job_packet = new JobPacket();
$job_packet->setParameters(serialize($arguments));
$job_packet->setPayload($function);
@ -381,7 +403,7 @@
throw new RuntimeException('do() failed, failed to push the job to the server', 0, $e);
}
self::addToWatchlist($job_packet->getId());
self::addToWatchlist($job_packet->getId(), $callback);
return $job_packet->getId();
}
@ -391,16 +413,23 @@
*
* @param string $function The function to call
* @param array $arguments The arguments to pass to the function
* @param int $channel The channel to preform the function call on (defaults to 0)
* @param array $options
* @return void
*/
public static function dof(string $function, array $arguments=[], int $channel=0): void
public static function dof(string $function, array $arguments=[], array $options=[]): void
{
if(self::$mode !== TamerMode::CLIENT)
{
throw new RuntimeException(sprintf('Attempting to dof() in \'%s\' mode, only clients can preform dof().', self::$mode));
}
$channel = 0;
if(isset($options['channel']) && is_int($options['channel']))
{
$channel = $options['channel'];
}
$job_packet = new JobPacket();
$job_packet->setParameters(serialize($arguments));
$job_packet->setPayload($function);
@ -420,9 +449,8 @@
* Waits for all the dispatched jobs to complete, this is a blocking function and will not return until all the
* jobs have completed. If a timeout is specified, the function will return after the timeout has been reached.
*
* @param callable|null $callback A callback function that will be called after each iteration of the wait loop
* @param int $timeout The timeout in seconds, if 0 is provided then the function will block until all the jobs
* have completed, if -1 is provided then the function run for one iteration and return
* @param int $timeout The timeout in seconds, if 0 is provided, then the function will block until all the jobs
* have completed, if -1 is provided then the function runs for one iteration and returns
* @return void
* @throws ConnectionException If the client fails to connect to the server
* @throws JobManagerException If the JobManager throws an exception
@ -430,7 +458,7 @@
* @throws Throwable If a job fails
* @throws TimeoutException If the timeout is reached
*/
public static function wait(?callable $callback=null, int $timeout=0): void
public static function wait(int $timeout=0): void
{
if(self::$mode !== TamerMode::CLIENT)
{
@ -438,35 +466,59 @@
}
$time_start = time();
if(count(self::$watching_jobs) === 0)
{
return;
}
$watching_jobs = self::$watching_jobs;
Log::verbose(Utilities::getName(), sprintf('Waiting for %s job(s) to complete', count($watching_jobs)));
while(true)
{
self::monitor(-1);
if(count(self::$watching_jobs) === 0)
if(count($watching_jobs) === 0)
{
return;
}
Log::debug(Utilities::getName(), 'Waiting for jobs to complete');
$job_packet = self::$job_manager->listenReturnChannel(self::$return_channel);
self::monitor(-1);
$job_id = self::$job_manager->listenReturnChannel(self::$return_channel);
if(!in_array($job_packet->getId(), self::$watching_jobs))
if(!in_array($job_id, $watching_jobs, false))
{
Log::debug(Utilities::getName(), sprintf('Job \'%s\' has returned, but is not in the watchlist', $job_packet->getId()));
self::$job_manager->dropJob($job_packet->getId());
Log::debug(Utilities::getName(), sprintf('Job \'%s\' has returned, but is not in the watchlist', $job_id));
self::$job_manager->pushbackJob($job_id, self::$return_channel);
continue;
}
self::removeFromWatchlist($job_packet->getId());
self::$job_manager->dropJob($job_packet->getId());
$job_packet = self::$job_manager->getJob($job_id);
if($callback !== null && $job_packet->getStatus() === JobStatus::FINISHED)
if(isset(self::$job_callbacks[$job_id]))
{
$callback = self::$job_callbacks[$job_id];
unset(self::$job_callbacks[$job_id]);
}
else
{
$callback = null;
}
self::$job_manager->dropJob($job_packet->getId());
self::removeFromWatchlist($job_packet->getId());
unset($watching_jobs[array_search($job_packet->getId(), $watching_jobs, false)]);
if($job_packet->getStatus() === JobStatus::FINISHED)
{
$return_value = $job_packet->getReturnValue();
if($return_value !== null)
{
$return_value = unserialize($return_value, ['allowed_classes' => true]);
}
$callback($job_packet->getId(), $return_value);
if($callback !== null)
{
$callback($return_value);
}
}
elseif($job_packet->getStatus() === JobStatus::FAILED)
{