Optimizations in \TamerLib\Classes\JobManager

This commit is contained in:
Netkas 2023-07-05 19:36:14 -04:00
parent 2083f63527
commit 264fea0c7c
No known key found for this signature in database
GPG key ID: 5DAF58535614062B

View file

@ -259,11 +259,13 @@
* @param JobPacket|string $job_id The JobPacket or JobPacket ID to get the status of * @param JobPacket|string $job_id The JobPacket or JobPacket ID to get the status of
* @param array|null $wait_for Optional. An array of statuses to wait for before returning the status * @param array|null $wait_for Optional. An array of statuses to wait for before returning the status
* @param int $timeout Optional. The number of seconds to wait for the status to change if $wait_for is set * @param int $timeout Optional. The number of seconds to wait for the status to change if $wait_for is set
* @return int Returns the status of the job as an integer, see JobStatus for the integer values of the statuses
* @throws ConnectionException Thrown if there is a connection issue with the server * @throws ConnectionException Thrown if there is a connection issue with the server
* @throws JobManagerException Thrown if there is an issue with the JobManager * @throws JobManagerException Thrown if there is an issue with the JobManager
* @throws JobNotFoundException Thrown if the JobPacket does not exist on the Redis Server
* @throws TimeoutException Thrown if the timeout is reached before the job status changes to one of the statuses in $wait_for * @throws TimeoutException Thrown if the timeout is reached before the job status changes to one of the statuses in $wait_for
* @throws JobNotFoundException
* @see JobStatus for the integer values of the statuses * @see JobStatus for the integer values of the statuses
* @return int Returns the status of the job as an integer, see JobStatus for the integer values of the statuses
*/ */
public function getJobStatus(JobPacket|string $job_id, ?array $wait_for=null, int $timeout=0): int public function getJobStatus(JobPacket|string $job_id, ?array $wait_for=null, int $timeout=0): int
{ {
@ -326,20 +328,18 @@
* @throws ConnectionException Thrown if there is a connection issue with the server * @throws ConnectionException Thrown if there is a connection issue with the server
* @throws JobManagerException Thrown if there is an issue with the JobManager * @throws JobManagerException Thrown if there is an issue with the JobManager
* @throws TimeoutException Thrown if the timeout is reached before a job is returned * @throws TimeoutException Thrown if the timeout is reached before a job is returned
* @return JobPacket Returns the returned job as a JobPacket * @return string Returns the returned job ID
*/ */
public function listenReturnChannel(string $return_channel, int $timeout=0): JobPacket public function listenReturnChannel(string $return_channel, int $timeout=0): string
{ {
try try
{ {
if($timeout < 0) if($timeout < 0)
{ {
Log::debug(Utilities::getName(), sprintf('Listening for job on return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
$job_packet = $this->getClient()->lPop($return_channel); $job_packet = $this->getClient()->lPop($return_channel);
} }
else else
{ {
Log::debug(Utilities::getName(), sprintf('Listening for job on return channel %s on %s:%s with timeout %s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $timeout));
$job_packet = $this->getClient()->blPop($return_channel, $timeout); $job_packet = $this->getClient()->blPop($return_channel, $timeout);
} }
} }
@ -357,16 +357,45 @@
throw new TimeoutException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort())); throw new TimeoutException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
} }
Log::debug(Utilities::getName(), sprintf('Received job %s from return channel %s', $job_packet[1], $return_channel)); return $job_packet[1];
}
/**
* Pushes a job back to the return channel. This is useful if a job is returned to the return channel, but the
* client does not want to process it yet.
*
* @param JobPacket|string $job_id
* @param string $return_channel
* @return void
* @throws ConnectionException
* @throws JobManagerException
*/
public function pushbackJob(JobPacket|string $job_id, string $return_channel): void
{
if($job_id instanceof JobPacket)
{
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$job_id = $job_id->getId();
}
try try
{ {
return $this->getJob($job_packet[1]); if(!$this->getClient()->exists($job_id))
}
catch(JobNotFoundException $e)
{ {
throw new JobManagerException(sprintf('Could not get job %s from %s:%s', $job_packet[1], $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e); throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
} }
$this->getClient()->rPush($return_channel, $job_id);
}
catch(RedisException $e)
{
throw new ConnectionException(sprintf('Client threw an error while trying to get job from return channel %s on %s:%s, %s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
}
catch(Exception $e)
{
throw new JobManagerException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e);
}
} }
/** /**
@ -534,6 +563,7 @@
public function rejectJob(JobPacket|string $job_id): void public function rejectJob(JobPacket|string $job_id): void
{ {
$channel_id = null; $channel_id = null;
$was_job_packet = false;
if($job_id instanceof JobPacket) if($job_id instanceof JobPacket)
{ {
@ -541,6 +571,7 @@
$channel_id = $job_id->getChannel(); $channel_id = $job_id->getChannel();
/** @noinspection CallableParameterUseCaseInTypeContextInspection */ /** @noinspection CallableParameterUseCaseInTypeContextInspection */
$job_id = $job_id->getId(); $job_id = $job_id->getId();
$was_job_packet = true;
} }
try try
@ -555,14 +586,16 @@
// Mark as rejected, clear worker_id // Mark as rejected, clear worker_id
$this->getClient()->hSet($job_id, 'worker_id', null); $this->getClient()->hSet($job_id, 'worker_id', null);
// Push back to the channel if($channel_id === null && !$was_job_packet)
{
// Get the channel_id if we don't have it already
$channel_id = $this->getClient()->hGet($job_id, 'channel_id');
}
if($channel_id !== null) if($channel_id !== null)
{ {
$channel_id = $this->getClient()->hGet($job_id, 'channel');
}
$this->getClient()->rPush(sprintf('ch%s', $channel_id), $job_id); $this->getClient()->rPush(sprintf('ch%s', $channel_id), $job_id);
}
return;
} }
catch(JobNotFoundException $e) catch(JobNotFoundException $e)
{ {