From 264fea0c7c07b980e515041b5e3dd6dade3c0d11 Mon Sep 17 00:00:00 2001 From: Netkas Date: Wed, 5 Jul 2023 19:36:14 -0400 Subject: [PATCH] Optimizations in \TamerLib\Classes\JobManager --- src/TamerLib/Classes/JobManager.php | 61 ++++++++++++++++++++++------- 1 file changed, 47 insertions(+), 14 deletions(-) diff --git a/src/TamerLib/Classes/JobManager.php b/src/TamerLib/Classes/JobManager.php index 9d5563e..af08d46 100644 --- a/src/TamerLib/Classes/JobManager.php +++ b/src/TamerLib/Classes/JobManager.php @@ -259,11 +259,13 @@ * @param JobPacket|string $job_id The JobPacket or JobPacket ID to get the status of * @param array|null $wait_for Optional. An array of statuses to wait for before returning the status * @param int $timeout Optional. The number of seconds to wait for the status to change if $wait_for is set + * @return int Returns the status of the job as an integer, see JobStatus for the integer values of the statuses * @throws ConnectionException Thrown if there is a connection issue with the server * @throws JobManagerException Thrown if there is an issue with the JobManager + * @throws JobNotFoundException Thrown if the JobPacket does not exist on the Redis Server * @throws TimeoutException Thrown if the timeout is reached before the job status changes to one of the statuses in $wait_for + * @throws JobNotFoundException * @see JobStatus for the integer values of the statuses - * @return int Returns the status of the job as an integer, see JobStatus for the integer values of the statuses */ public function getJobStatus(JobPacket|string $job_id, ?array $wait_for=null, int $timeout=0): int { @@ -326,20 +328,18 @@ * @throws ConnectionException Thrown if there is a connection issue with the server * @throws JobManagerException Thrown if there is an issue with the JobManager * @throws TimeoutException Thrown if the timeout is reached before a job is returned - * @return JobPacket Returns the returned job as a JobPacket + * @return string Returns the returned job ID */ - public function listenReturnChannel(string $return_channel, int $timeout=0): JobPacket + public function listenReturnChannel(string $return_channel, int $timeout=0): string { try { if($timeout < 0) { - Log::debug(Utilities::getName(), sprintf('Listening for job on return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort())); $job_packet = $this->getClient()->lPop($return_channel); } else { - Log::debug(Utilities::getName(), sprintf('Listening for job on return channel %s on %s:%s with timeout %s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $timeout)); $job_packet = $this->getClient()->blPop($return_channel, $timeout); } } @@ -357,16 +357,45 @@ throw new TimeoutException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort())); } - Log::debug(Utilities::getName(), sprintf('Received job %s from return channel %s', $job_packet[1], $return_channel)); + return $job_packet[1]; + } + + /** + * Pushes a job back to the return channel. This is useful if a job is returned to the return channel, but the + * client does not want to process it yet. + * + * @param JobPacket|string $job_id + * @param string $return_channel + * @return void + * @throws ConnectionException + * @throws JobManagerException + */ + public function pushbackJob(JobPacket|string $job_id, string $return_channel): void + { + if($job_id instanceof JobPacket) + { + /** @noinspection CallableParameterUseCaseInTypeContextInspection */ + $job_id = $job_id->getId(); + } try { - return $this->getJob($job_packet[1]); + if(!$this->getClient()->exists($job_id)) + { + throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + } + + $this->getClient()->rPush($return_channel, $job_id); } - catch(JobNotFoundException $e) + catch(RedisException $e) { - throw new JobManagerException(sprintf('Could not get job %s from %s:%s', $job_packet[1], $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e); + throw new ConnectionException(sprintf('Client threw an error while trying to get job from return channel %s on %s:%s, %s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e); } + catch(Exception $e) + { + throw new JobManagerException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e); + } + } /** @@ -534,6 +563,7 @@ public function rejectJob(JobPacket|string $job_id): void { $channel_id = null; + $was_job_packet = false; if($job_id instanceof JobPacket) { @@ -541,6 +571,7 @@ $channel_id = $job_id->getChannel(); /** @noinspection CallableParameterUseCaseInTypeContextInspection */ $job_id = $job_id->getId(); + $was_job_packet = true; } try @@ -555,14 +586,16 @@ // Mark as rejected, clear worker_id $this->getClient()->hSet($job_id, 'worker_id', null); - // Push back to the channel + if($channel_id === null && !$was_job_packet) + { + // Get the channel_id if we don't have it already + $channel_id = $this->getClient()->hGet($job_id, 'channel_id'); + } + if($channel_id !== null) { - $channel_id = $this->getClient()->hGet($job_id, 'channel'); + $this->getClient()->rPush(sprintf('ch%s', $channel_id), $job_id); } - $this->getClient()->rPush(sprintf('ch%s', $channel_id), $job_id); - - return; } catch(JobNotFoundException $e) {