Refactored JobManager

This commit is contained in:
Netkas 2023-06-16 00:51:30 -04:00
parent f20551857b
commit ffdaaebfbc
No known key found for this signature in database
GPG key ID: 5DAF58535614062B

View file

@ -7,10 +7,11 @@
use Exception; use Exception;
use LogLib\Log; use LogLib\Log;
use Redis; use Redis;
use RedisException;
use TamerLib\Enums\JobStatus; use TamerLib\Enums\JobStatus;
use TamerLib\Exceptions\JobManagerException;
use TamerLib\Exceptions\JobNotFoundException; use TamerLib\Exceptions\JobNotFoundException;
use TamerLib\Exceptions\ConnectionException; use TamerLib\Exceptions\ConnectionException;
use TamerLib\Exceptions\ServerException;
use TamerLib\Exceptions\TimeoutException; use TamerLib\Exceptions\TimeoutException;
use TamerLib\Objects\JobPacket; use TamerLib\Objects\JobPacket;
use TamerLib\Objects\ServerConfiguration; use TamerLib\Objects\ServerConfiguration;
@ -34,6 +35,11 @@
*/ */
private $last_connect; private $last_connect;
/**
* @var int
*/
private $ping_count;
/** /**
* JobManager constructor. * JobManager constructor.
* *
@ -43,15 +49,15 @@
{ {
$this->server_configuration = $serverConfiguration; $this->server_configuration = $serverConfiguration;
$this->redis_client = new Redis(); $this->redis_client = new Redis();
$this->ping_count = 3;
} }
/** /**
* Attempts to determine if the Redis Server is online. * Attempts to determine if the Redis Server is online.
* *
* @param bool $ping * @return bool Returns true if the Redis Server is online, false otherwise.
* @return bool
*/ */
private function isConnected(bool $ping=true): bool private function isConnected(): bool
{ {
if($this->redis_client === null) if($this->redis_client === null)
{ {
@ -60,9 +66,15 @@
try try
{ {
if($ping) // Ping every 3 calls
if($this->ping_count >= 3)
{ {
$this->redis_client->ping(); $this->redis_client->ping();
$this->ping_count = 0;
}
else
{
++$this->ping_count;
} }
} }
catch(Exception $e) catch(Exception $e)
@ -77,15 +89,17 @@
/** /**
* Attempts to connect to the Redis Server * Attempts to connect to the Redis Server
* *
* @throws ConnectionException Thrown if there is a connection issue with the server
* @throws JobManagerException Thrown if there is an issue with the JobManager
* @return void * @return void
* @throws ConnectionException
*/ */
private function connect(): void private function connect(): void
{ {
// Reconnect every 30 minutes // Reconnect every 30 minutes
if ($this->last_connect !== null && $this->last_connect < (time() - 1800)) if ($this->last_connect !== null && $this->last_connect < (time() - 1800))
{ {
//$this->disconnect(); Log::verbose(Utilities::getName(), sprintf('JobManager reconnecting to %s:%s (Last connect timeout)', $this->server_configuration->getHost(), $this->server_configuration->getPort()));
$this->disconnect();
} }
if($this->isConnected()) if($this->isConnected())
@ -100,17 +114,22 @@
$this->server_configuration->getPort() $this->server_configuration->getPort()
); );
if ($this->server_configuration->getPassword() !== null) if ($this->server_configuration->getPassword() !== null && $this->server_configuration->getPassword() !== '')
{ {
$this->redis_client->auth($this->server_configuration->getPassword()); $this->redis_client->auth($this->server_configuration->getPassword());
} }
$this->redis_client->select($this->server_configuration->getDatabase()); $this->redis_client->select($this->server_configuration->getDatabase());
$this->last_connect = time(); $this->last_connect = time();
$this->redis_client->ping();
}
catch(RedisException $e)
{
throw new ConnectionException(sprintf('Unable to connect to %s:%s, %s', $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
} }
catch(Exception $e) catch(Exception $e)
{ {
throw new ConnectionException(sprintf('Could not connect to %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); throw new JobManagerException(sprintf('There was an unexpected error while trying to connect to %s:%s, %s', $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
} }
} }
@ -121,13 +140,20 @@
*/ */
private function disconnect(): void private function disconnect(): void
{ {
if($this->isConnected())
{
return;
}
Log::debug(Utilities::getName(), sprintf('JobManager disconnecting from %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()));
try try
{ {
$this->redis_client->close(); $this->redis_client->close();
} }
catch(Exception $e) catch(Exception $e)
{ {
Log::warning('net.nosial.tamerlib',sprintf('JobManager could not disconnect safely from %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e); Log::warning(Utilities::getName(),sprintf('JobManager could not disconnect safely from %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e);
} }
finally finally
{ {
@ -138,8 +164,9 @@
/** /**
* Returns the Redis Client, or attempts to connect to the Redis Server if not connected. * Returns the Redis Client, or attempts to connect to the Redis Server if not connected.
* *
* @return Redis * @throws ConnectionException Thrown if there is a connection issue with the server
* @throws ConnectionException * @throws JobManagerException Thrown if there is an issue with the JobManager
* @return Redis Returns the Redis Client
*/ */
private function getClient(): Redis private function getClient(): Redis
{ {
@ -148,11 +175,14 @@
} }
/** /**
* Pushes a JobPacket to the Redis server. * Pushes a JobPacket to the Redis server, this will take the JobPacket, push it as a hash to the server
* then push the JobPacket ID to the channel list specified in the JobPacket, this will allow workers to
* pull the JobPacket from the server and process it.
* *
* @param JobPacket $jobPacket * @param JobPacket $jobPacket The JobPacket to push to the Redis server
* @throws ConnectionException Thrown if there is a connection issue with the server
* @throws JobManagerException Thrown if there is an issue with the JobManager
* @return void * @return void
* @throws ServerException
*/ */
public function pushJob(JobPacket $jobPacket): void public function pushJob(JobPacket $jobPacket): void
{ {
@ -161,19 +191,25 @@
$this->getClient()->hMSet($jobPacket->getId(), $jobPacket->toArray()); $this->getClient()->hMSet($jobPacket->getId(), $jobPacket->toArray());
$this->getClient()->rPush(sprintf('ch%s', $jobPacket->getChannel()), $jobPacket->getId()); $this->getClient()->rPush(sprintf('ch%s', $jobPacket->getChannel()), $jobPacket->getId());
} }
catch(RedisException $e)
{
throw new ConnectionException(sprintf('There was an error while trying to push job %s to %s:%s, %s', $jobPacket->getId(), $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
}
catch(Exception $e) catch(Exception $e)
{ {
throw new ServerException(sprintf('Could not push job to %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); throw new JobManagerException(sprintf('There was an unexpected error while trying to push job %s to %s:%s, %s', $jobPacket->getId(), $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
} }
} }
/** /**
* Attempts to get a specific job from the Redis Server. * Attempts to get a specific job from the Redis Server, if the requested JobPacket does not exist on the
* server then a JobNotFoundException will be thrown.
* *
* @param JobPacket|string $job_id * @param JobPacket|string $job_id The JobPacket or JobPacket ID to get from the Redis Server
* @return JobPacket * @throws ConnectionException Thrown if there is a connection issue with the server
* @throws JobNotFoundException * @throws JobManagerException Thrown if there is an issue with the JobManager
* @throws ServerException * @throws JobNotFoundException Thrown if the JobPacket does not exist on the Redis Server
* @return JobPacket Returns the JobPacket from the Redis Server
*/ */
public function getJob(JobPacket|string $job_id): JobPacket public function getJob(JobPacket|string $job_id): JobPacket
{ {
@ -197,20 +233,40 @@
{ {
throw $e; throw $e;
} }
catch(RedisException $e)
{
throw new ConnectionException(sprintf('Client threw an error while trying to get job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
}
catch(Exception $e) catch(Exception $e)
{ {
throw new ServerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); throw new JobManagerException(sprintf('There was an unexpected error while trying to get job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
} }
} }
/** /**
* Returns the status of a job. * Returns the status of a job, optionally if $wait_for is set to an array of statuses then the method will
* wait until the job status is one of the statuses in the array before returning. In this case the $timeout
* parameter will be used to determine how long to wait for the job status to change.
* *
* @param JobPacket|string $job_id * Returns the status of the job as an integer, the integer will be one of the following:
* @return int *
* @throws ServerException * - JobStatus::REJECTED = 50
* - JobStatus::FAILED = 40
* - JobStatus::FINISHED = 30
* - JobStatus::PROCESSING = 20
* - JobStatus::WAITING = 10
*
* @param JobPacket|string $job_id The JobPacket or JobPacket ID to get the status of
* @param array|null $wait_for Optional. An array of statuses to wait for before returning the status
* @param int $timeout Optional. The number of seconds to wait for the status to change if $wait_for is set
* @throws ConnectionException Thrown if there is a connection issue with the server
* @throws JobManagerException Thrown if there is an issue with the JobManager
* @throws JobNotFoundException Thrown if the JobPacket does not exist on the server
* @throws TimeoutException Thrown if the timeout is reached before the job status changes to one of the statuses in $wait_for
* @see JobStatus for the integer values of the statuses
* @return int Returns the status of the job as an integer, see JobStatus for the integer values of the statuses
*/ */
public function getJobStatus(JobPacket|string $job_id): int public function getJobStatus(JobPacket|string $job_id, ?array $wait_for=null, int $timeout=0): int
{ {
if($job_id instanceof JobPacket) if($job_id instanceof JobPacket)
{ {
@ -218,6 +274,27 @@
$job_id = $job_id->getId(); $job_id = $job_id->getId();
} }
if($wait_for !== null)
{
$start_time = time();
while(true)
{
$job = $this->getJob($job_id);
if(in_array($job->getStatus(), $wait_for, true))
{
return $job->getStatus();
}
if($timeout > 0 && (time() - $start_time) > $timeout)
{
throw new TimeoutException(sprintf('Timed out waiting for job %s to be one of the following statuses: %s', $job_id, implode(', ', $wait_for)));
}
usleep(100000);
}
}
try try
{ {
if(!$this->getClient()->exists($job_id)) if(!$this->getClient()->exists($job_id))
@ -227,42 +304,54 @@
return (int)$this->getClient()->hGet($job_id, 'status'); return (int)$this->getClient()->hGet($job_id, 'status');
} }
catch(JobNotFoundException $e)
{
throw $e;
}
catch(RedisException $e)
{
throw new ConnectionException(sprintf('Client threw an error while trying to get job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
}
catch(Exception $e) catch(Exception $e)
{ {
throw new ServerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); throw new JobManagerException(sprintf('There was an unexpected error while trying to get job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
} }
} }
/** /**
* Listens on a return channel for a returned job. * Listens on a return channel for a returned job, optionally with a timeout. If the timeout is reached before
* a job is returned then a TimeoutException will be thrown.
* *
* @param string $return_channel * @param string $return_channel The return channel to listen on
* @param int $timeout * @param int $timeout Optional. The number of seconds to wait for a job to be returned
* @return JobPacket * @throws ConnectionException Thrown if there is a connection issue with the server
* @throws JobNotFoundException * @throws JobManagerException Thrown if there is an issue with the JobManager
* @throws ServerException * @throws JobNotFoundException Thrown if the JobPacket does not exist on the server
* @throws TimeoutException * @throws TimeoutException Thrown if the timeout is reached before a job is returned
* @return JobPacket Returns the returned job as a JobPacket
*/ */
public function listenReturnChannel(string $return_channel, int $timeout=0): JobPacket public function listenReturnChannel(string $return_channel, int $timeout=0): JobPacket
{ {
$time_start = time();
try try
{ {
if($timeout < 0) if($timeout < 0)
{ {
Log::debug('net.nosial.tamerlib', sprintf('Listening for job on return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort())); Log::debug(Utilities::getName(), sprintf('Listening for job on return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
$job_packet = $this->getClient()->lPop($return_channel); $job_packet = $this->getClient()->lPop($return_channel);
} }
else else
{ {
Log::debug('net.nosial.tamerlib', sprintf('Listening for job on return channel %s on %s:%s with timeout %s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $timeout)); Log::debug(Utilities::getName(), sprintf('Listening for job on return channel %s on %s:%s with timeout %s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $timeout));
$job_packet = $this->getClient()->blPop($return_channel, $timeout); $job_packet = $this->getClient()->blPop($return_channel, $timeout);
} }
} }
catch(RedisException $e)
{
throw new ConnectionException(sprintf('Client threw an error while trying to get job from return channel %s on %s:%s, %s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
}
catch(Exception $e) catch(Exception $e)
{ {
throw new TimeoutException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); throw new JobManagerException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e);
} }
if($job_packet === null) if($job_packet === null)
@ -270,22 +359,18 @@
throw new TimeoutException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort())); throw new TimeoutException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
} }
if($job_packet[1] === 'test') Log::debug(Utilities::getName(), sprintf('Received job %s from return channel %s', $job_packet[1], $return_channel));
{
return $this->listenReturnChannel($return_channel, $timeout);
}
Log::debug('net.nosial.tamerlib', sprintf('Received job %s from return channel %ss', $job_packet[1], $return_channel));
return $this->getJob($job_packet[1]); return $this->getJob($job_packet[1]);
} }
/** /**
* Gets the return value of a job. * Returns the unserialized return value of a job.
* *
* @param JobPacket|string $job_id * @param JobPacket|string $job_id The JobPacket or job ID to get the return value of
* @return mixed * @throws ConnectionException Thrown if there is a connection issue with the server
* @throws JobNotFoundException * @throws JobManagerException Thrown if there is an issue with the JobManager
* @throws ServerException * @throws JobNotFoundException Thrown if the JobPacket does not exist on the server
* @return mixed Returns the unserialized return value of the job
*/ */
public function getJobResult(JobPacket|string $job_id): mixed public function getJobResult(JobPacket|string $job_id): mixed
{ {
@ -302,24 +387,32 @@
throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
} }
return $this->getClient()->hGet($job_id, 'return'); return unserialize($this->getClient()->hGet($job_id, 'return'), ['allowed_classes' => true]);
} }
catch(JobNotFoundException $e) catch(JobNotFoundException $e)
{ {
throw $e; throw $e;
} }
catch(RedisException $e)
{
throw new ConnectionException(sprintf('Client threw an error while trying to get job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
}
catch(Exception $e) catch(Exception $e)
{ {
throw new ServerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); throw new JobManagerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e);
} }
} }
/** /**
* Returns the exception to a job. * Attempts to unserialize and return the exception thrown by a job, if the exception cannot be unserialized,
* a generic Exception is returned explaining that the exception could not be unserialized and what it got
* instead.
* *
* @param JobPacket|string $job_id * @param JobPacket|string $job_id The JobPacket or job ID to get the exception to
* @return Throwable * @throws ConnectionException Thrown if there is a connection issue with the server
* @throws ServerException * @throws JobManagerException Thrown if there is an issue with the JobManager
* @throws JobNotFoundException Thrown if the JobPacket does not exist on the server
* @return Throwable Returns the unserialized exception thrown by the job
*/ */
public function getJobException(JobPacket|string $job_id): Throwable public function getJobException(JobPacket|string $job_id): Throwable
{ {
@ -336,11 +429,31 @@
throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
} }
return unserialize($this->getClient()->hGet($job_id, 'exception'), ['allowed_classes'=>true]); $exception = unserialize($this->getClient()->hGet($job_id, 'exception'), ['allowed_classes'=>true]);
if($exception instanceof Throwable)
{
return $exception;
}
if($exception === false)
{
return new Exception(sprintf('Job %s threw an exception, but the exception could not be unserialized.', $job_id));
}
return new Exception(sprintf('Job %s threw an exception, but the exception is not an instance of Throwable, got \'%s\'', $job_id, gettype($exception)));
}
catch(JobNotFoundException $e)
{
throw $e;
}
catch(RedisException $e)
{
throw new ConnectionException(sprintf('Client threw an error while trying to get job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
} }
catch(Exception $e) catch(Exception $e)
{ {
throw new ServerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); throw new JobManagerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e);
} }
} }
@ -348,10 +461,14 @@
* Attempts to claim an existing JobPacket, returns True if the worker's claim was successful. * Attempts to claim an existing JobPacket, returns True if the worker's claim was successful.
* Returns False if the JobPacket does not exist, or if the JobPacket is already claimed. * Returns False if the JobPacket does not exist, or if the JobPacket is already claimed.
* *
* @param JobPacket|string $job_id * This also automatically sets the JobPacket's status to 'processing' and sets the worker ID
* @param WorkerConfiguration|string $worker_id * to the worker claiming the job.
* @return bool *
* @throws ServerException * @param JobPacket|string $job_id The JobPacket or job ID to claim
* @param WorkerConfiguration|string $worker_id The WorkerConfiguration or worker ID to claim the job with
* @throws ConnectionException Thrown if there is a connection issue with the server
* @throws JobManagerException Thrown if there is an issue with the JobManager
* @return bool Returns True if the worker successfully claimed the job, False otherwise
*/ */
public function claimJob(JobPacket|string $job_id, WorkerConfiguration|string $worker_id): bool public function claimJob(JobPacket|string $job_id, WorkerConfiguration|string $worker_id): bool
{ {
@ -377,6 +494,7 @@
// Attempt to claim the job // Attempt to claim the job
$this->getClient()->hSet($job_id, 'worker_id', $worker_id); $this->getClient()->hSet($job_id, 'worker_id', $worker_id);
// Verify that the job was claimed // Verify that the job was claimed
if($this->getClient()->hGet($job_id, 'worker_id') !== $worker_id) if($this->getClient()->hGet($job_id, 'worker_id') !== $worker_id)
{ {
@ -386,65 +504,26 @@
// Set the job status to processing // Set the job status to processing
$this->getClient()->hSet($job_id, 'status', JobStatus::PROCESSING); $this->getClient()->hSet($job_id, 'status', JobStatus::PROCESSING);
} }
catch(RedisException $e)
{
throw new ConnectionException(sprintf('Client threw an error while trying to claim job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
}
catch(Exception $e) catch(Exception $e)
{ {
throw new ServerException(sprintf('Could not claim job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); throw new JobManagerException(sprintf('Could not claim job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e);
} }
return true; return true;
} }
/** /**
* Marks a job as finished, and sets the return value. * Rejects a job and pushes it back to the channel, allowing another worker to claim it.
* *
* @param JobPacket|string $job_id * @param JobPacket|string $job_id The JobPacket or job ID to reject
* @param mixed|null $return_value * @throws ConnectionException Thrown if there is a connection issue with the server
* @throws JobManagerException Thrown if there is an issue with the JobManager
* @throws JobNotFoundException Thrown if the JobPacket does not exist on the server
* @return void * @return void
* @throws JobNotFoundException
* @throws ServerException
*/
public function returnJob(JobPacket|string $job_id, mixed $return_value=null): void
{
if($job_id instanceof JobPacket)
{
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$job_id = $job_id->getId();
}
try
{
if($this->getClient()->exists($job_id))
{
$return_channel = $this->getClient()->hGet($job_id, 'return_channel');
Log::debug('net.nosial.tamerlib', sprintf('Returning job %s (Return Channel: %s)', $job_id, $return_channel ?? 'n/a'));
if($return_channel === null)
{
$this->getClient()->del($job_id);
return;
}
$this->getClient()->hSet($job_id, 'return_value', serialize($return_value));
$this->getClient()->hSet($job_id, 'status', JobStatus::FINISHED);
$this->getClient()->rPush($this->getClient()->hGet($job_id, 'return_channel'), $job_id);
return;
}
}
catch(Exception $e)
{
throw new ServerException(sprintf('Could not return job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e);
}
throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
}
/**
* Rejects a job and pushes it back to the channel.
*
* @param JobPacket|string $job_id
* @return void
* @throws JobNotFoundException
* @throws ServerException
*/ */
public function rejectJob(JobPacket|string $job_id): void public function rejectJob(JobPacket|string $job_id): void
{ {
@ -460,12 +539,16 @@
try try
{ {
if($this->getClient()->exists($job_id)) if(!$this->getClient()->exists($job_id))
{ {
Log::debug('net.nosial.tamerlib', sprintf('Rejecting job %s', $job_id)); throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
}
Log::debug(Utilities::getName(), sprintf('Rejecting job %s', $job_id));
// Mark as rejected, clear worker_id // Mark as rejected, clear worker_id
$this->getClient()->hSet($job_id, 'worker_id', null); $this->getClient()->hSet($job_id, 'worker_id', null);
// Push back to the channel // Push back to the channel
if($channel_id !== null) if($channel_id !== null)
{ {
@ -475,22 +558,80 @@
return; return;
} }
catch(JobNotFoundException $e)
{
throw $e;
}
catch(RedisException $e)
{
throw new ConnectionException(sprintf('Client threw an error while trying to reject job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
} }
catch(Exception $e) catch(Exception $e)
{ {
throw new ServerException(sprintf('Could not reject job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); throw new JobManagerException(sprintf('Could not reject job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e);
}
} }
/**
* Marks a job as finished, and sets the return value. This basically returns the job to the client.
* If there is no return_channel set, the job is deleted from the server.
*
* @param JobPacket|string $job_id The JobPacket or job ID to return
* @param mixed|null $return_value The return value to set on the job
* @throws ConnectionException Thrown if there is a connection issue with the server
* @throws JobManagerException Thrown if there is an issue with the JobManager
* @throws JobNotFoundException Thrown if the JobPacket does not exist on the server
* @return void
*/
public function returnJob(JobPacket|string $job_id, mixed $return_value=null): void
{
if($job_id instanceof JobPacket)
{
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$job_id = $job_id->getId();
}
try
{
if(!$this->getClient()->exists($job_id))
{
throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
} }
$return_channel = $this->getClient()->hGet($job_id, 'return_channel');
if($return_channel === null)
{
$this->getClient()->del($job_id);
return;
}
Log::debug(Utilities::getName(), sprintf('Returning job %s (Return Channel: %s)', $job_id, $return_channel ?? 'n/a'));
$this->getClient()->hSet($job_id, 'return_value', serialize($return_value));
$this->getClient()->hSet($job_id, 'status', JobStatus::FINISHED);
$this->getClient()->rPush($this->getClient()->hGet($job_id, 'return_channel'), $job_id);
}
catch(JobNotFoundException $e)
{
throw $e;
}
catch(RedisException $e)
{
throw new ConnectionException(sprintf('Client threw an error while trying to return job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
}
catch(Exception $e)
{
throw new JobManagerException(sprintf('Could not return job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e);
}
}
/** /**
* Sets the job as failed, and sets the exception that was thrown. * Sets the job as failed, and sets the exception that was thrown.
* *
* @param JobPacket|string $job_id * @param JobPacket|string $job_id The JobPacket or job ID to return
* @param Throwable $throwable * @param Throwable $throwable The exception that was thrown
* @throws ConnectionException Thrown if there is a connection issue with the server
* @throws JobManagerException Thrown if there is an issue with the JobManager
* @return void * @return void
* @throws ServerException
*/ */
public function returnException(JobPacket|string $job_id, Throwable $throwable): void public function returnException(JobPacket|string $job_id, Throwable $throwable): void
{ {
@ -504,38 +645,41 @@
{ {
if($this->getClient()->exists($job_id)) if($this->getClient()->exists($job_id))
{ {
Log::debug('net.nosial.tamerlib', sprintf('Returning exception for job %s', $job_id));
if($this->getClient()->hGet($job_id, 'return_channel') === null) if($this->getClient()->hGet($job_id, 'return_channel') === null)
{ {
$this->getClient()->del($job_id); $this->getClient()->del($job_id);
return; return;
} }
Log::debug(Utilities::getName(), sprintf('Returning exception for job %s', $job_id));
$this->getClient()->hSet($job_id, 'exception', serialize($throwable)); $this->getClient()->hSet($job_id, 'exception', serialize($throwable));
$this->getClient()->hSet($job_id, 'status', JobStatus::FAILED); $this->getClient()->hSet($job_id, 'status', JobStatus::FAILED);
$this->getClient()->rPush($this->getClient()->hGet($job_id, 'return_channel'), $job_id); $this->getClient()->rPush($this->getClient()->hGet($job_id, 'return_channel'), $job_id);
Log::debug('net.nosial.tamerlib', sprintf('Pushed job %s to return channel %s', $job_id, $this->getClient()->hGet($job_id, 'return_channel'))); Log::debug(Utilities::getName(), sprintf('Pushed job %s to return channel %s', $job_id, $this->getClient()->hGet($job_id, 'return_channel')));
return; return;
} }
} }
catch(RedisException $e)
{
throw new ConnectionException(sprintf('Client threw an error while trying to return exception for job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
}
catch(Exception $e) catch(Exception $e)
{ {
throw new ServerException(sprintf('Could not return exception for job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); throw new JobManagerException(sprintf('Could not return exception for job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e);
} }
} }
/** /**
* Waits for a job to be available on the channel or multiple channels, attempts to claim it, and returns it. * Waits for a job to be available on the channel or multiple channels, attempts to claim it, and returns it.
*
* If multiple channels are selected, the function will iterate through each channel until a job is found. * If multiple channels are selected, the function will iterate through each channel until a job is found.
* *
* @param WorkerConfiguration|string $worker_id The worker that is waiting for the job. * @param WorkerConfiguration|string $worker_id The worker that is waiting for the job.
* @param int|array $channel The channel (or channels) to wait for the job on. * @param int|array $channel The channel (or channels) to wait for the job on.
* @param int $timeout The timeout in seconds to wait for the job. * @param int $timeout The timeout in seconds to wait for the job.
* @throws ConnectionException Thrown if there is a connection issue with the server
* @throws JobManagerException Thrown if there is an issue with the JobManager
* @throws TimeoutException Thrown if the timeout is reached before a job is available
* @return JobPacket The job that was claimed, or null if no job was available. * @return JobPacket The job that was claimed, or null if no job was available.
* @throws ServerException
* @throws TimeoutException
*/ */
public function listenForJob(WorkerConfiguration|string $worker_id, int|array $channel=0, int $timeout=0): JobPacket public function listenForJob(WorkerConfiguration|string $worker_id, int|array $channel=0, int $timeout=0): JobPacket
{ {
@ -588,19 +732,27 @@
} }
} }
catch(TimeoutException $e)
{
throw $e;
}
catch(RedisException $e)
{
throw new ConnectionException(sprintf('Client threw an error while trying to listen for job on %s:%s, %s', $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
}
catch(Exception $e) catch(Exception $e)
{ {
throw new ServerException(sprintf('Could not wait for job on channel %s from %s:%s', $channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); throw new JobManagerException(sprintf('Could not listen for job on %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e);
} }
} }
/** /**
* Drops a job from the server entirely. * Drops a job from the server entirely, if the job doesn't exist then nothing happens.
* *
* @param JobPacket|string $job_id * @param JobPacket|string $job_id The job to drop.
* @throws ConnectionException Thrown if there is a connection issue with the server
* @throws JobManagerException Thrown if there is an issue with the JobManager
* @return void * @return void
* @throws JobNotFoundException
* @throws ServerException
*/ */
public function dropJob(JobPacket|string $job_id): void public function dropJob(JobPacket|string $job_id): void
{ {
@ -612,17 +764,20 @@
try try
{ {
if($this->getClient()->exists($job_id)) if(!$this->getClient()->exists($job_id))
{ {
$this->getClient()->del($job_id);
return; return;
} }
$this->getClient()->del($job_id);
}
catch(RedisException $e)
{
throw new ConnectionException(sprintf('Client threw an error while trying to drop job %s from %s:%s, %s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $e->getMessage()), $e);
} }
catch(Exception $e) catch(Exception $e)
{ {
throw new ServerException(sprintf('Could not drop job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); throw new JobManagerException(sprintf('Could not drop job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e);
} }
throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()));
} }
} }