From ffdaaebfbcca2018aedcaba65966176d015a43c7 Mon Sep 17 00:00:00 2001 From: Netkas Date: Fri, 16 Jun 2023 00:51:30 -0400 Subject: [PATCH] Refactored JobManager --- src/TamerLib/Classes/JobManager.php | 449 +++++++++++++++++++--------- 1 file changed, 302 insertions(+), 147 deletions(-) diff --git a/src/TamerLib/Classes/JobManager.php b/src/TamerLib/Classes/JobManager.php index 48eb84d..b981330 100644 --- a/src/TamerLib/Classes/JobManager.php +++ b/src/TamerLib/Classes/JobManager.php @@ -7,10 +7,11 @@ use Exception; use LogLib\Log; use Redis; + use RedisException; use TamerLib\Enums\JobStatus; + use TamerLib\Exceptions\JobManagerException; use TamerLib\Exceptions\JobNotFoundException; use TamerLib\Exceptions\ConnectionException; - use TamerLib\Exceptions\ServerException; use TamerLib\Exceptions\TimeoutException; use TamerLib\Objects\JobPacket; use TamerLib\Objects\ServerConfiguration; @@ -34,6 +35,11 @@ */ private $last_connect; + /** + * @var int + */ + private $ping_count; + /** * JobManager constructor. * @@ -43,15 +49,15 @@ { $this->server_configuration = $serverConfiguration; $this->redis_client = new Redis(); + $this->ping_count = 3; } /** * Attempts to determine if the Redis Server is online. * - * @param bool $ping - * @return bool + * @return bool Returns true if the Redis Server is online, false otherwise. */ - private function isConnected(bool $ping=true): bool + private function isConnected(): bool { if($this->redis_client === null) { @@ -60,9 +66,15 @@ try { - if($ping) + // Ping every 3 calls + if($this->ping_count >= 3) { $this->redis_client->ping(); + $this->ping_count = 0; + } + else + { + ++$this->ping_count; } } catch(Exception $e) @@ -77,15 +89,17 @@ /** * Attempts to connect to the Redis Server * + * @throws ConnectionException Thrown if there is a connection issue with the server + * @throws JobManagerException Thrown if there is an issue with the JobManager * @return void - * @throws ConnectionException */ private function connect(): void { // Reconnect every 30 minutes if ($this->last_connect !== null && $this->last_connect < (time() - 1800)) { - //$this->disconnect(); + Log::verbose(Utilities::getName(), sprintf('JobManager reconnecting to %s:%s (Last connect timeout)', $this->server_configuration->getHost(), $this->server_configuration->getPort())); + $this->disconnect(); } if($this->isConnected()) @@ -100,17 +114,22 @@ $this->server_configuration->getPort() ); - if ($this->server_configuration->getPassword() !== null) + if ($this->server_configuration->getPassword() !== null && $this->server_configuration->getPassword() !== '') { $this->redis_client->auth($this->server_configuration->getPassword()); } $this->redis_client->select($this->server_configuration->getDatabase()); $this->last_connect = time(); + $this->redis_client->ping(); + } + catch(RedisException $e) + { + throw new ConnectionException(sprintf('Unable to connect to %s:%s, %s', $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); } catch(Exception $e) { - throw new ConnectionException(sprintf('Could not connect to %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + throw new JobManagerException(sprintf('There was an unexpected error while trying to connect to %s:%s, %s', $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); } } @@ -121,13 +140,20 @@ */ private function disconnect(): void { + if($this->isConnected()) + { + return; + } + + Log::debug(Utilities::getName(), sprintf('JobManager disconnecting from %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort())); + try { $this->redis_client->close(); } catch(Exception $e) { - Log::warning('net.nosial.tamerlib',sprintf('JobManager could not disconnect safely from %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e); + Log::warning(Utilities::getName(),sprintf('JobManager could not disconnect safely from %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e); } finally { @@ -138,8 +164,9 @@ /** * Returns the Redis Client, or attempts to connect to the Redis Server if not connected. * - * @return Redis - * @throws ConnectionException + * @throws ConnectionException Thrown if there is a connection issue with the server + * @throws JobManagerException Thrown if there is an issue with the JobManager + * @return Redis Returns the Redis Client */ private function getClient(): Redis { @@ -148,11 +175,14 @@ } /** - * Pushes a JobPacket to the Redis server. + * Pushes a JobPacket to the Redis server, this will take the JobPacket, push it as a hash to the server + * then push the JobPacket ID to the channel list specified in the JobPacket, this will allow workers to + * pull the JobPacket from the server and process it. * - * @param JobPacket $jobPacket + * @param JobPacket $jobPacket The JobPacket to push to the Redis server + * @throws ConnectionException Thrown if there is a connection issue with the server + * @throws JobManagerException Thrown if there is an issue with the JobManager * @return void - * @throws ServerException */ public function pushJob(JobPacket $jobPacket): void { @@ -161,19 +191,25 @@ $this->getClient()->hMSet($jobPacket->getId(), $jobPacket->toArray()); $this->getClient()->rPush(sprintf('ch%s', $jobPacket->getChannel()), $jobPacket->getId()); } + catch(RedisException $e) + { + throw new ConnectionException(sprintf('There was an error while trying to push job %s to %s:%s, %s', $jobPacket->getId(), $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); + } catch(Exception $e) { - throw new ServerException(sprintf('Could not push job to %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + throw new JobManagerException(sprintf('There was an unexpected error while trying to push job %s to %s:%s, %s', $jobPacket->getId(), $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); } } /** - * Attempts to get a specific job from the Redis Server. + * Attempts to get a specific job from the Redis Server, if the requested JobPacket does not exist on the + * server then a JobNotFoundException will be thrown. * - * @param JobPacket|string $job_id - * @return JobPacket - * @throws JobNotFoundException - * @throws ServerException + * @param JobPacket|string $job_id The JobPacket or JobPacket ID to get from the Redis Server + * @throws ConnectionException Thrown if there is a connection issue with the server + * @throws JobManagerException Thrown if there is an issue with the JobManager + * @throws JobNotFoundException Thrown if the JobPacket does not exist on the Redis Server + * @return JobPacket Returns the JobPacket from the Redis Server */ public function getJob(JobPacket|string $job_id): JobPacket { @@ -197,20 +233,40 @@ { throw $e; } + catch(RedisException $e) + { + throw new ConnectionException(sprintf('Client threw an error while trying to get job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); + } catch(Exception $e) { - throw new ServerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + throw new JobManagerException(sprintf('There was an unexpected error while trying to get job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); } } /** - * Returns the status of a job. + * Returns the status of a job, optionally if $wait_for is set to an array of statuses then the method will + * wait until the job status is one of the statuses in the array before returning. In this case the $timeout + * parameter will be used to determine how long to wait for the job status to change. * - * @param JobPacket|string $job_id - * @return int - * @throws ServerException + * Returns the status of the job as an integer, the integer will be one of the following: + * + * - JobStatus::REJECTED = 50 + * - JobStatus::FAILED = 40 + * - JobStatus::FINISHED = 30 + * - JobStatus::PROCESSING = 20 + * - JobStatus::WAITING = 10 + * + * @param JobPacket|string $job_id The JobPacket or JobPacket ID to get the status of + * @param array|null $wait_for Optional. An array of statuses to wait for before returning the status + * @param int $timeout Optional. The number of seconds to wait for the status to change if $wait_for is set + * @throws ConnectionException Thrown if there is a connection issue with the server + * @throws JobManagerException Thrown if there is an issue with the JobManager + * @throws JobNotFoundException Thrown if the JobPacket does not exist on the server + * @throws TimeoutException Thrown if the timeout is reached before the job status changes to one of the statuses in $wait_for + * @see JobStatus for the integer values of the statuses + * @return int Returns the status of the job as an integer, see JobStatus for the integer values of the statuses */ - public function getJobStatus(JobPacket|string $job_id): int + public function getJobStatus(JobPacket|string $job_id, ?array $wait_for=null, int $timeout=0): int { if($job_id instanceof JobPacket) { @@ -218,6 +274,27 @@ $job_id = $job_id->getId(); } + if($wait_for !== null) + { + $start_time = time(); + while(true) + { + $job = $this->getJob($job_id); + + if(in_array($job->getStatus(), $wait_for, true)) + { + return $job->getStatus(); + } + + if($timeout > 0 && (time() - $start_time) > $timeout) + { + throw new TimeoutException(sprintf('Timed out waiting for job %s to be one of the following statuses: %s', $job_id, implode(', ', $wait_for))); + } + + usleep(100000); + } + } + try { if(!$this->getClient()->exists($job_id)) @@ -227,42 +304,54 @@ return (int)$this->getClient()->hGet($job_id, 'status'); } + catch(JobNotFoundException $e) + { + throw $e; + } + catch(RedisException $e) + { + throw new ConnectionException(sprintf('Client threw an error while trying to get job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); + } catch(Exception $e) { - throw new ServerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + throw new JobManagerException(sprintf('There was an unexpected error while trying to get job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); } } /** - * Listens on a return channel for a returned job. + * Listens on a return channel for a returned job, optionally with a timeout. If the timeout is reached before + * a job is returned then a TimeoutException will be thrown. * - * @param string $return_channel - * @param int $timeout - * @return JobPacket - * @throws JobNotFoundException - * @throws ServerException - * @throws TimeoutException + * @param string $return_channel The return channel to listen on + * @param int $timeout Optional. The number of seconds to wait for a job to be returned + * @throws ConnectionException Thrown if there is a connection issue with the server + * @throws JobManagerException Thrown if there is an issue with the JobManager + * @throws JobNotFoundException Thrown if the JobPacket does not exist on the server + * @throws TimeoutException Thrown if the timeout is reached before a job is returned + * @return JobPacket Returns the returned job as a JobPacket */ public function listenReturnChannel(string $return_channel, int $timeout=0): JobPacket { - $time_start = time(); - try { if($timeout < 0) { - Log::debug('net.nosial.tamerlib', sprintf('Listening for job on return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + Log::debug(Utilities::getName(), sprintf('Listening for job on return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort())); $job_packet = $this->getClient()->lPop($return_channel); } else { - Log::debug('net.nosial.tamerlib', sprintf('Listening for job on return channel %s on %s:%s with timeout %s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $timeout)); + Log::debug(Utilities::getName(), sprintf('Listening for job on return channel %s on %s:%s with timeout %s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $timeout)); $job_packet = $this->getClient()->blPop($return_channel, $timeout); } } + catch(RedisException $e) + { + throw new ConnectionException(sprintf('Client threw an error while trying to get job from return channel %s on %s:%s, %s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); + } catch(Exception $e) { - throw new TimeoutException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + throw new JobManagerException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e); } if($job_packet === null) @@ -270,22 +359,18 @@ throw new TimeoutException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort())); } - if($job_packet[1] === 'test') - { - return $this->listenReturnChannel($return_channel, $timeout); - } - - Log::debug('net.nosial.tamerlib', sprintf('Received job %s from return channel %ss', $job_packet[1], $return_channel)); + Log::debug(Utilities::getName(), sprintf('Received job %s from return channel %s', $job_packet[1], $return_channel)); return $this->getJob($job_packet[1]); } /** - * Gets the return value of a job. + * Returns the unserialized return value of a job. * - * @param JobPacket|string $job_id - * @return mixed - * @throws JobNotFoundException - * @throws ServerException + * @param JobPacket|string $job_id The JobPacket or job ID to get the return value of + * @throws ConnectionException Thrown if there is a connection issue with the server + * @throws JobManagerException Thrown if there is an issue with the JobManager + * @throws JobNotFoundException Thrown if the JobPacket does not exist on the server + * @return mixed Returns the unserialized return value of the job */ public function getJobResult(JobPacket|string $job_id): mixed { @@ -302,24 +387,32 @@ throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); } - return $this->getClient()->hGet($job_id, 'return'); + return unserialize($this->getClient()->hGet($job_id, 'return'), ['allowed_classes' => true]); } catch(JobNotFoundException $e) { throw $e; } + catch(RedisException $e) + { + throw new ConnectionException(sprintf('Client threw an error while trying to get job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); + } catch(Exception $e) { - throw new ServerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + throw new JobManagerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e); } } /** - * Returns the exception to a job. + * Attempts to unserialize and return the exception thrown by a job, if the exception cannot be unserialized, + * a generic Exception is returned explaining that the exception could not be unserialized and what it got + * instead. * - * @param JobPacket|string $job_id - * @return Throwable - * @throws ServerException + * @param JobPacket|string $job_id The JobPacket or job ID to get the exception to + * @throws ConnectionException Thrown if there is a connection issue with the server + * @throws JobManagerException Thrown if there is an issue with the JobManager + * @throws JobNotFoundException Thrown if the JobPacket does not exist on the server + * @return Throwable Returns the unserialized exception thrown by the job */ public function getJobException(JobPacket|string $job_id): Throwable { @@ -336,11 +429,31 @@ throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); } - return unserialize($this->getClient()->hGet($job_id, 'exception'), ['allowed_classes'=>true]); + $exception = unserialize($this->getClient()->hGet($job_id, 'exception'), ['allowed_classes'=>true]); + + if($exception instanceof Throwable) + { + return $exception; + } + + if($exception === false) + { + return new Exception(sprintf('Job %s threw an exception, but the exception could not be unserialized.', $job_id)); + } + + return new Exception(sprintf('Job %s threw an exception, but the exception is not an instance of Throwable, got \'%s\'', $job_id, gettype($exception))); + } + catch(JobNotFoundException $e) + { + throw $e; + } + catch(RedisException $e) + { + throw new ConnectionException(sprintf('Client threw an error while trying to get job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); } catch(Exception $e) { - throw new ServerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + throw new JobManagerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e); } } @@ -348,10 +461,14 @@ * Attempts to claim an existing JobPacket, returns True if the worker's claim was successful. * Returns False if the JobPacket does not exist, or if the JobPacket is already claimed. * - * @param JobPacket|string $job_id - * @param WorkerConfiguration|string $worker_id - * @return bool - * @throws ServerException + * This also automatically sets the JobPacket's status to 'processing' and sets the worker ID + * to the worker claiming the job. + * + * @param JobPacket|string $job_id The JobPacket or job ID to claim + * @param WorkerConfiguration|string $worker_id The WorkerConfiguration or worker ID to claim the job with + * @throws ConnectionException Thrown if there is a connection issue with the server + * @throws JobManagerException Thrown if there is an issue with the JobManager + * @return bool Returns True if the worker successfully claimed the job, False otherwise */ public function claimJob(JobPacket|string $job_id, WorkerConfiguration|string $worker_id): bool { @@ -377,6 +494,7 @@ // Attempt to claim the job $this->getClient()->hSet($job_id, 'worker_id', $worker_id); + // Verify that the job was claimed if($this->getClient()->hGet($job_id, 'worker_id') !== $worker_id) { @@ -386,65 +504,26 @@ // Set the job status to processing $this->getClient()->hSet($job_id, 'status', JobStatus::PROCESSING); } + catch(RedisException $e) + { + throw new ConnectionException(sprintf('Client threw an error while trying to claim job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); + } catch(Exception $e) { - throw new ServerException(sprintf('Could not claim job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + throw new JobManagerException(sprintf('Could not claim job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e); } return true; } /** - * Marks a job as finished, and sets the return value. + * Rejects a job and pushes it back to the channel, allowing another worker to claim it. * - * @param JobPacket|string $job_id - * @param mixed|null $return_value + * @param JobPacket|string $job_id The JobPacket or job ID to reject + * @throws ConnectionException Thrown if there is a connection issue with the server + * @throws JobManagerException Thrown if there is an issue with the JobManager + * @throws JobNotFoundException Thrown if the JobPacket does not exist on the server * @return void - * @throws JobNotFoundException - * @throws ServerException - */ - public function returnJob(JobPacket|string $job_id, mixed $return_value=null): void - { - if($job_id instanceof JobPacket) - { - /** @noinspection CallableParameterUseCaseInTypeContextInspection */ - $job_id = $job_id->getId(); - } - - try - { - if($this->getClient()->exists($job_id)) - { - $return_channel = $this->getClient()->hGet($job_id, 'return_channel'); - Log::debug('net.nosial.tamerlib', sprintf('Returning job %s (Return Channel: %s)', $job_id, $return_channel ?? 'n/a')); - - if($return_channel === null) - { - $this->getClient()->del($job_id); - return; - } - - $this->getClient()->hSet($job_id, 'return_value', serialize($return_value)); - $this->getClient()->hSet($job_id, 'status', JobStatus::FINISHED); - $this->getClient()->rPush($this->getClient()->hGet($job_id, 'return_channel'), $job_id); - return; - } - } - catch(Exception $e) - { - throw new ServerException(sprintf('Could not return job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); - } - - throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); - } - - /** - * Rejects a job and pushes it back to the channel. - * - * @param JobPacket|string $job_id - * @return void - * @throws JobNotFoundException - * @throws ServerException */ public function rejectJob(JobPacket|string $job_id): void { @@ -460,37 +539,99 @@ try { - if($this->getClient()->exists($job_id)) + if(!$this->getClient()->exists($job_id)) { - Log::debug('net.nosial.tamerlib', sprintf('Rejecting job %s', $job_id)); - - // Mark as rejected, clear worker_id - $this->getClient()->hSet($job_id, 'worker_id', null); - // Push back to the channel - if($channel_id !== null) - { - $channel_id = $this->getClient()->hGet($job_id, 'channel'); - } - $this->getClient()->rPush(sprintf('ch%s', $channel_id), $job_id); - - return; + throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); } + + Log::debug(Utilities::getName(), sprintf('Rejecting job %s', $job_id)); + + // Mark as rejected, clear worker_id + $this->getClient()->hSet($job_id, 'worker_id', null); + + // Push back to the channel + if($channel_id !== null) + { + $channel_id = $this->getClient()->hGet($job_id, 'channel'); + } + $this->getClient()->rPush(sprintf('ch%s', $channel_id), $job_id); + + return; + } + catch(JobNotFoundException $e) + { + throw $e; + } + catch(RedisException $e) + { + throw new ConnectionException(sprintf('Client threw an error while trying to reject job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); } catch(Exception $e) { - throw new ServerException(sprintf('Could not reject job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + throw new JobManagerException(sprintf('Could not reject job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e); + } + } + + /** + * Marks a job as finished, and sets the return value. This basically returns the job to the client. + * If there is no return_channel set, the job is deleted from the server. + * + * @param JobPacket|string $job_id The JobPacket or job ID to return + * @param mixed|null $return_value The return value to set on the job + * @throws ConnectionException Thrown if there is a connection issue with the server + * @throws JobManagerException Thrown if there is an issue with the JobManager + * @throws JobNotFoundException Thrown if the JobPacket does not exist on the server + * @return void + */ + public function returnJob(JobPacket|string $job_id, mixed $return_value=null): void + { + if($job_id instanceof JobPacket) + { + /** @noinspection CallableParameterUseCaseInTypeContextInspection */ + $job_id = $job_id->getId(); } - throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + try + { + if(!$this->getClient()->exists($job_id)) + { + throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + } + + $return_channel = $this->getClient()->hGet($job_id, 'return_channel'); + if($return_channel === null) + { + $this->getClient()->del($job_id); + return; + } + + Log::debug(Utilities::getName(), sprintf('Returning job %s (Return Channel: %s)', $job_id, $return_channel ?? 'n/a')); + $this->getClient()->hSet($job_id, 'return_value', serialize($return_value)); + $this->getClient()->hSet($job_id, 'status', JobStatus::FINISHED); + $this->getClient()->rPush($this->getClient()->hGet($job_id, 'return_channel'), $job_id); + } + catch(JobNotFoundException $e) + { + throw $e; + } + catch(RedisException $e) + { + throw new ConnectionException(sprintf('Client threw an error while trying to return job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); + } + catch(Exception $e) + { + throw new JobManagerException(sprintf('Could not return job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e); + } } /** * Sets the job as failed, and sets the exception that was thrown. * - * @param JobPacket|string $job_id - * @param Throwable $throwable + * @param JobPacket|string $job_id The JobPacket or job ID to return + * @param Throwable $throwable The exception that was thrown + * @throws ConnectionException Thrown if there is a connection issue with the server + * @throws JobManagerException Thrown if there is an issue with the JobManager * @return void - * @throws ServerException */ public function returnException(JobPacket|string $job_id, Throwable $throwable): void { @@ -504,38 +645,41 @@ { if($this->getClient()->exists($job_id)) { - Log::debug('net.nosial.tamerlib', sprintf('Returning exception for job %s', $job_id)); - if($this->getClient()->hGet($job_id, 'return_channel') === null) { $this->getClient()->del($job_id); return; } + Log::debug(Utilities::getName(), sprintf('Returning exception for job %s', $job_id)); $this->getClient()->hSet($job_id, 'exception', serialize($throwable)); $this->getClient()->hSet($job_id, 'status', JobStatus::FAILED); $this->getClient()->rPush($this->getClient()->hGet($job_id, 'return_channel'), $job_id); - Log::debug('net.nosial.tamerlib', sprintf('Pushed job %s to return channel %s', $job_id, $this->getClient()->hGet($job_id, 'return_channel'))); + Log::debug(Utilities::getName(), sprintf('Pushed job %s to return channel %s', $job_id, $this->getClient()->hGet($job_id, 'return_channel'))); return; } } + catch(RedisException $e) + { + throw new ConnectionException(sprintf('Client threw an error while trying to return exception for job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); + } catch(Exception $e) { - throw new ServerException(sprintf('Could not return exception for job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + throw new JobManagerException(sprintf('Could not return exception for job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e); } } /** * Waits for a job to be available on the channel or multiple channels, attempts to claim it, and returns it. - * * If multiple channels are selected, the function will iterate through each channel until a job is found. * * @param WorkerConfiguration|string $worker_id The worker that is waiting for the job. * @param int|array $channel The channel (or channels) to wait for the job on. * @param int $timeout The timeout in seconds to wait for the job. + * @throws ConnectionException Thrown if there is a connection issue with the server + * @throws JobManagerException Thrown if there is an issue with the JobManager + * @throws TimeoutException Thrown if the timeout is reached before a job is available * @return JobPacket The job that was claimed, or null if no job was available. - * @throws ServerException - * @throws TimeoutException */ public function listenForJob(WorkerConfiguration|string $worker_id, int|array $channel=0, int $timeout=0): JobPacket { @@ -588,19 +732,27 @@ } } + catch(TimeoutException $e) + { + throw $e; + } + catch(RedisException $e) + { + throw new ConnectionException(sprintf('Client threw an error while trying to listen for job on %s:%s, %s', $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); + } catch(Exception $e) { - throw new ServerException(sprintf('Could not wait for job on channel %s from %s:%s', $channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + throw new JobManagerException(sprintf('Could not listen for job on %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e); } } /** - * Drops a job from the server entirely. + * Drops a job from the server entirely, if the job doesn't exist then nothing happens. * - * @param JobPacket|string $job_id + * @param JobPacket|string $job_id The job to drop. + * @throws ConnectionException Thrown if there is a connection issue with the server + * @throws JobManagerException Thrown if there is an issue with the JobManager * @return void - * @throws JobNotFoundException - * @throws ServerException */ public function dropJob(JobPacket|string $job_id): void { @@ -612,17 +764,20 @@ try { - if($this->getClient()->exists($job_id)) + if(!$this->getClient()->exists($job_id)) { - $this->getClient()->del($job_id); return; } + + $this->getClient()->del($job_id); + } + catch(RedisException $e) + { + throw new ConnectionException(sprintf('Client threw an error while trying to drop job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); } catch(Exception $e) { - throw new ServerException(sprintf('Could not drop job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + throw new JobManagerException(sprintf('Could not drop job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e); } - - throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); } } \ No newline at end of file