From 2b39a9df9e6d7857eaa76f0ca0deb6ca2e73c8a6 Mon Sep 17 00:00:00 2001 From: Netkas Date: Fri, 16 Jun 2023 02:16:25 -0400 Subject: [PATCH] Many changes in \TamerLib > tm --- src/TamerLib/tm.php | 274 +++++++++++++++++++++----------------------- 1 file changed, 130 insertions(+), 144 deletions(-) diff --git a/src/TamerLib/tm.php b/src/TamerLib/tm.php index 04c2c9e..ccf667f 100644 --- a/src/TamerLib/tm.php +++ b/src/TamerLib/tm.php @@ -4,34 +4,28 @@ namespace TamerLib; - use Closure; use Exception; use InvalidArgumentException; use LogLib\Log; - use Opis\Closure\SerializableClosure; - use Redis; use RedisException; use RuntimeException; - use TamerLib\Classes\AdaptiveSleep; use TamerLib\Classes\JobManager; use TamerLib\Classes\RedisServer; + use TamerLib\Classes\Utilities; use TamerLib\Classes\WorkerSupervisor; - use TamerLib\Enums\EncodingType; use TamerLib\Enums\JobStatus; - use TamerLib\Enums\JobType; use TamerLib\Enums\TamerMode; + use TamerLib\Exceptions\ConnectionException; + use TamerLib\Exceptions\JobManagerException; use TamerLib\Exceptions\JobNotFoundException; - use TamerLib\Exceptions\NoAvailablePortException; use TamerLib\Exceptions\ServerException; use TamerLib\Exceptions\TimeoutException; + use TamerLib\Exceptions\WorkerFailedException; use TamerLib\Objects\JobPacket; use TamerLib\Objects\ServerConfiguration; use TamerLib\Objects\WorkerConfiguration; use Throwable; - /** - * @method static mixed __call(string $name, array $arguments) - */ class tm { /** @@ -111,7 +105,6 @@ } } - /** * GLOBAL FUNCTIONS */ @@ -139,7 +132,7 @@ * @throws ServerException * @throws Exception */ - public static function initalize(?string $mode, ?ServerConfiguration $server_config=null): void + public static function initialize(?string $mode, ?ServerConfiguration $server_config=null): void { if(self::$mode !== null) { @@ -192,7 +185,7 @@ if($mode === TamerMode::CLIENT) { self::$supervisor = new WorkerSupervisor(self::$server_configuration); - self::$return_channel = 'rch' . random_int(100000000, 999999999); + self::$return_channel = sprintf('rch%s', random_int(100000000, 999999999)); } self::$job_manager = new JobManager(self::$server_configuration); @@ -202,6 +195,7 @@ * Shuts down all workers * * @return void + * @noinspection PhpUnused */ public static function shutdown(): void { @@ -216,6 +210,35 @@ } } + /** + * Returns the current mode Tamer is running in + * + * @return string + */ + public static function getMode(): string + { + return self::$mode ?? TamerMode::NONE; + } + + /** + * Monitors all internal processes + * + * @param int $timeout + * @return void + */ + public static function monitor(int $timeout=0): void + { + try + { + self::$supervisor?->monitor($timeout); + self::$server?->monitor($timeout); + } + catch(Exception $e) + { + Log::error(Utilities::getName(), $e->getMessage(), $e); + } + } + /** * CLIENT FUNCTIONS */ @@ -228,6 +251,7 @@ * @param string|null $path * @param int $channel * @return void + * @throws WorkerFailedException */ public static function createWorker(int $count=8, ?string $path=null, int $channel=0): void { @@ -244,42 +268,8 @@ { self::$supervisor->spawnWorker($path, $count, $channel); } - } - /** - * Preforms a job in the background, returns the Job ID to keep track of the job status. - * - * @param callable $function - * @param array $arguments - * @param int $channel - * @return string - */ - public static function do(callable $function, array $arguments, int $channel=0): string - { - if(self::$mode !== TamerMode::CLIENT) - { - throw new RuntimeException(sprintf('Attempting to do() in \'%s\' mode, only clients can preform do().', self::$mode)); - } - - $job_packet = new JobPacket(); - $job_packet->setJobType(JobType::CLOSURE); - $job_packet->setParameters(serialize($arguments)); - $job_packet->setPayload(serialize(new SerializableClosure($function))); - $job_packet->setChannel($channel); - $job_packet->setReturnChannel(self::$return_channel); - - - try - { - self::$job_manager->pushJob($job_packet); - } - catcH(Exception $e) - { - throw new RuntimeException('do() failed, failed to push job to the server', 0, $e); - } - - self::addToWatchlist($job_packet->getId()); - return $job_packet->getId(); + self::monitor(-1); } /** @@ -290,55 +280,19 @@ * @param int $channel * @return mixed */ - public static function call(string $function, array $arguments, int $channel=0): mixed + public static function do(string $function, array $arguments, int $channel=0): mixed { if(self::$mode !== TamerMode::CLIENT) { - throw new RuntimeException(sprintf('Attempting to call() in \'%s\' mode, only clients can preform call().', self::$mode)); + throw new RuntimeException(sprintf('Attempting to do() in \'%s\' mode, only clients can preform do().', self::$mode)); } $job_packet = new JobPacket(); - $job_packet->setJobType(JobType::FUNCTION); $job_packet->setParameters(serialize($arguments)); $job_packet->setPayload($function); $job_packet->setChannel($channel); $job_packet->setReturnChannel(self::$return_channel); - try - { - self::$job_manager->pushJob($job_packet); - } - catcH(Exception $e) - { - throw new RuntimeException('call() failed, failed to push job to the server', 0, $e); - } - - self::addToWatchlist($job_packet->getId()); - return $job_packet->getId(); - } - - /** - * Does a job in the background, but once the job is completed it will be forgotten and the result will not be - * returned, this also means that the job will not be added to the watchlist. - * - * @param callable $function - * @param array $arguments - * @param int $channel - * @return void - */ - public function dof(callable $function, array $arguments, int $channel=0): void - { - if(self::$mode !== TamerMode::CLIENT) - { - throw new RuntimeException(sprintf('Attempting to dof() in \'%s\' mode, only clients can preform dof().', self::$mode)); - } - - $job_packet = new JobPacket(); - $job_packet->setJobType(JobType::CLOSURE); - $job_packet->setParameters(serialize($arguments)); - $job_packet->setPayload(serialize(new SerializableClosure($function))); - $job_packet->setChannel($channel); - try { self::$job_manager->pushJob($job_packet); @@ -349,6 +303,7 @@ } self::addToWatchlist($job_packet->getId()); + return $job_packet->getId(); } /** @@ -359,18 +314,17 @@ * @param array $arguments * @param int $channel * @return void + * @noinspection PhpUnused */ - public static function callf(string $function, array $arguments, int $channel=0): void + public static function dof(string $function, array $arguments, int $channel=0): void { if(self::$mode !== TamerMode::CLIENT) { - throw new RuntimeException(sprintf('Attempting to callf() in \'%s\' mode, only clients can preform callf().', self::$mode)); + throw new RuntimeException(sprintf('Attempting to dof() in \'%s\' mode, only clients can preform dof().', self::$mode)); } $job_packet = new JobPacket(); - $job_packet->setJobType(JobType::FUNCTION); $job_packet->setParameters(serialize($arguments)); - $job_packet->setForget(true); $job_packet->setPayload($function); $job_packet->setChannel($channel); @@ -380,12 +334,31 @@ } catcH(Exception $e) { - throw new RuntimeException('callf() failed, failed to push job to the server', 0, $e); + throw new RuntimeException('dof() failed, failed to push job to the server', 0, $e); } self::addToWatchlist($job_packet->getId()); } + /** + * Preforms a do() call on a waitFor() call all in one function. + * + * @param string $function + * @param array $arguments + * @param int $channel + * @param int $timeout + * @return mixed + * @throws JobNotFoundException + * @throws ServerException + * @throws Throwable + * @throws TimeoutException + * @noinspection PhpUnused + */ + public static function doWait(string $function, array $arguments, int $channel=0, int $timeout=0): mixed + { + return self::waitFor(self::do($function, $arguments, $channel), $timeout); + } + /** * Waits for all the dispatched jobs to complete, this is a blocking function and will not return until all the * jobs have completed. If a timeout is specified, the function will return after the timeout has been reached. @@ -407,19 +380,18 @@ $time_start = time(); while(true) { + self::monitor(-1); if(count(self::$watching_jobs) === 0) { - Log::debug('net.nosial.tamerlib', 'No jobs to wait for, returning'); + Log::debug(Utilities::getName(), 'No jobs to wait for, returning'); return; } - Log::debug('net.nosial.tamerlib', 'Waiting for jobs to complete'); + Log::debug(Utilities::getName(), 'Waiting for jobs to complete'); $job_packet = self::$job_manager->listenReturnChannel(self::$return_channel); if(in_array($job_packet->getId(), self::$watching_jobs)) { - Log::debug('net.nosial.tamerlib', sprintf('Job \'%s\' has returned, removing from watchlist', $job_packet->getId())); - self::removeFromWatchlist($job_packet->getId()); self::$job_manager->dropJob($job_packet->getId()); @@ -437,7 +409,7 @@ if($return_value === false) { - Log::error('net.nosial.tamerlib', 'Failed to unserialize return value, return value was dropped'); + Log::error(Utilities::getName(), 'Failed to unserialize return value, return value was dropped'); $return_value = null; } } @@ -452,7 +424,7 @@ } catch(Exception $e) { - Log::error('net.nosial.tamerlib', 'Failed to unserialize exception, exception was dropped', $e); + Log::error(Utilities::getName(), 'Failed to unserialize exception, exception was dropped', $e); } finally { @@ -466,13 +438,13 @@ } else { - Log::debug('net.nosial.tamerlib', sprintf('Job \'%s\' returned with an unexpected status of \'%s\'', $job_packet->getId(), $job_packet->getStatus())); + Log::debug(Utilities::getName(), sprintf('Job \'%s\' returned with an unexpected status of \'%s\'', $job_packet->getId(), $job_packet->getStatus())); throw new ServerException('wait() failed, job returned with an unexpected status of \'' . $job_packet->getStatus() . '\''); } } else { - Log::debug('net.nosial.tamerlib', sprintf('Job \'%s\' has returned, but is not in the watchlist', $job_packet->getId())); + Log::debug(Utilities::getName(), sprintf('Job \'%s\' has returned, but is not in the watchlist', $job_packet->getId())); } if ($timeout < 0) @@ -516,8 +488,7 @@ while(true) { - self::$supervisor->monitor(-1); - + self::monitor(-1); switch(self::$job_manager->getJobStatus($job_id)) { case JobStatus::FINISHED: @@ -550,6 +521,7 @@ * Clears the watchlist, this will remove all jobs from the watchlist. * * @return void + * @noinspection PhpUnused */ public static function clear(): void { @@ -570,7 +542,7 @@ */ public static function __callStatic(string $name, array $arguments) { - return self::call($name, $arguments); + return self::do($name, $arguments); } /** @@ -595,6 +567,7 @@ /** * @param string $function * @return void + * @noinspection PhpUnused */ public static function removeFunction(string $function): void { @@ -608,6 +581,7 @@ /** * @return array + * @noinspection PhpUnused */ public static function getFunctions(): array { @@ -623,6 +597,8 @@ * @param int|array $channel * @param int $timeout * @return void + * @throws ConnectionException + * @throws JobManagerException * @throws JobNotFoundException * @throws ServerException */ @@ -633,52 +609,62 @@ throw new RuntimeException(sprintf('Attempting to run() in \'%s\' mode, only workers can preform run().', self::$mode)); } + $start_time = time(); + $error_time = null; + + while(true) + { + try + { + $job_packet = self::$job_manager->listenForJob(self::$worker_configuration->getWorkerId(), $channel, $timeout); + break; + } + catch(TimeoutException $e) + { + unset($e); + return; + } + /** @noinspection PhpRedundantCatchClauseInspection */ + catch(RedisException $e) + { + // TODO: There has to be a better way to do this. + if(strtolower($e->getMessage()) === 'redis server went away') + { + if($error_time === null) + { + $error_time = time(); + } + else if((time() - $error_time) >= 5) + { + throw new ServerException('Redis server went away, and did not come back.'); + } + } + } + + if((time() - $start_time) >= $timeout) + { + return; + } + } + + + Log::debug(Utilities::getName(), sprintf('Worker %s received job %s', self::$worker_configuration->getWorkerId(), $job_packet->getId())); + + if(!isset(self::$function_pointers[$job_packet->getPayload()])) + { + + Log::warning(Utilities::getName(), sprintf('Job %s requested function \'%s\' which does not exist, rejecting job.', $job_packet->getId(), $job_packet->getPayload())); + self::$job_manager->rejectJob($job_packet); + } + try { - $job_packet = self::$job_manager->listenForJob(self::$worker_configuration->getWorkerId(), $channel, $timeout); + $result = call_user_func_array(self::$function_pointers[$job_packet->getPayload()], unserialize($job_packet->getParameters(), ['allowed_classes'=>true])); + self::$job_manager->returnJob($job_packet, $result); } - catch(TimeoutException $e) + catch(Exception $e) { - unset($e); - return; - } - - Log::debug('net.nosial.tamerlib', sprintf('Worker %s received job %s', self::$worker_configuration->getWorkerId(), $job_packet->getId())); - - switch($job_packet->getJobType()) - { - case JobType::FUNCTION: - if(!isset(self::$function_pointers[$job_packet->getPayload()])) - { - Log::warning('net.nosial.tamerlib', sprintf('Job %s requested function \'%s\' which does not exist, rejecting job.', $job_packet->getId(), $job_packet->getPayload())); - self::$job_manager->rejectJob($job_packet); - } - - try - { - $result = call_user_func_array(self::$function_pointers[$job_packet->getPayload()], unserialize($job_packet->getParameters(), ['allowed_classes'=>true])); - self::$job_manager->returnJob($job_packet, $result); - } - catch(Exception $e) - { - var_dump($e); - self::$job_manager->returnException($job_packet, $e); - } - break; - - case JobType::CLOSURE: - try - { - $result = unserialize($job_packet->getPayload(), ['allowed_classes'=>true])( - unserialize($job_packet->getParameters(), ['allowed_classes'=>true]) - ); - self::$job_manager->returnJob($job_packet, $result); - } - catch(Exception $e) - { - self::$job_manager->returnException($job_packet, $e); - } - break; + self::$job_manager->returnException($job_packet, $e); } } } \ No newline at end of file