From f20551857bca017a7513cff2ad678e55b9f94f86 Mon Sep 17 00:00:00 2001 From: Netkas Date: Mon, 12 Jun 2023 21:32:35 -0400 Subject: [PATCH] Lots of changes, implemented the base usage. Beware of bugs and unfinished states --- .idea/php.xml | 1 - README.md | 38 +- functions.txt | 66 ++ old | 261 +++++++ src/TamerLib/Classes/AdaptiveSleep.php | 77 ++ src/TamerLib/Classes/JobManager.php | 628 ++++++++++++++++ src/TamerLib/Classes/RedisServer.php | 62 +- src/TamerLib/Classes/Utilities.php | 80 ++ src/TamerLib/Classes/WorkerSupervisor.php | 177 +++++ src/TamerLib/Classes/subproc | 28 + src/TamerLib/Enums/EncodingType.php | 11 + src/TamerLib/Enums/JobStatus.php | 23 + src/TamerLib/Enums/JobType.php | 10 + src/TamerLib/Enums/TamerMode.php | 15 + src/TamerLib/Enums/WorkerType.php | 9 + .../Exceptions/ConnectionException.php | 19 + .../Exceptions/JobNotFoundException.php | 19 + ...erverException.php => ServerException.php} | 3 +- src/TamerLib/Exceptions/TimeoutException.php | 19 + src/TamerLib/Objects/JobPacket.php | 311 ++++++++ src/TamerLib/Objects/ServerConfiguration.php | 112 +++ src/TamerLib/Objects/WorkerConfiguration.php | 196 +++++ src/TamerLib/Objects/WorkerInstance.php | 187 +++++ src/TamerLib/Tamer.php | 11 - src/TamerLib/tm.php | 684 ++++++++++++++++++ tests/adaptive_sleep.php | 29 + tests/tamer_standalone.php | 40 + 27 files changed, 3045 insertions(+), 71 deletions(-) create mode 100644 functions.txt create mode 100644 old create mode 100644 src/TamerLib/Classes/AdaptiveSleep.php create mode 100644 src/TamerLib/Classes/JobManager.php create mode 100644 src/TamerLib/Classes/WorkerSupervisor.php create mode 100644 src/TamerLib/Classes/subproc create mode 100644 src/TamerLib/Enums/EncodingType.php create mode 100644 src/TamerLib/Enums/JobStatus.php create mode 100644 src/TamerLib/Enums/JobType.php create mode 100644 src/TamerLib/Enums/TamerMode.php create mode 100644 src/TamerLib/Enums/WorkerType.php create mode 100644 src/TamerLib/Exceptions/ConnectionException.php create mode 100644 src/TamerLib/Exceptions/JobNotFoundException.php rename src/TamerLib/Exceptions/{RedisServerException.php => ServerException.php} (80%) create mode 100644 src/TamerLib/Exceptions/TimeoutException.php create mode 100644 src/TamerLib/Objects/JobPacket.php create mode 100644 src/TamerLib/Objects/ServerConfiguration.php create mode 100644 src/TamerLib/Objects/WorkerConfiguration.php create mode 100644 src/TamerLib/Objects/WorkerInstance.php delete mode 100644 src/TamerLib/Tamer.php create mode 100644 src/TamerLib/tm.php create mode 100644 tests/adaptive_sleep.php create mode 100644 tests/tamer_standalone.php diff --git a/.idea/php.xml b/.idea/php.xml index 8ba8e7b..184ca30 100644 --- a/.idea/php.xml +++ b/.idea/php.xml @@ -12,7 +12,6 @@ - diff --git a/README.md b/README.md index 866adab..32804fa 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ Optionally, you can also pass a username and password to the `init` method, whic server (such as with RabbitMQ) if the server requires it. You may also just provide a password if a username is not required. ```php -TamerLib\Tamer::init(\TamerLib\Abstracts\ProtocolType::Gearman, [ +TamerLib\tm::init(\TamerLib\Abstracts\ProtocolType::Gearman, [ 'host:port', 'host:port' ], $username, $password); ``` @@ -96,7 +96,7 @@ be called in the client side once the worker has finished executing the job, the job as a parameter. ```php -TamerLib\Tamer::queueClosure(function(){ +TamerLib\tm::queueClosure(function(){ // Do something return 'Hello World'; }, function($result){ @@ -118,7 +118,7 @@ You can create a task object by using the `TamerLib\Tamer::create` method, this For example, to send a queued task to a worker with a defined function named `sleep` you can do the following: ```php -TamerLib\Tamer::queue(\TamerLib\Objects\Task::create('sleep', 5, function($result){ +TamerLib\tm::queue(\TamerLib\Objects\Task::create('sleep', 5, function($result){ echo $result; // 5 })); ``` @@ -127,7 +127,7 @@ Once you have queued all the jobs you want to send to the worker, you can execut `TamerLib\Tamer::work()`, this will run all the jobs in the queue in parallel and execute each callback accordingly. ```php -TamerLib\Tamer::work(); +TamerLib\tm::work(); ``` #### Fire & Forget Jobs @@ -136,7 +136,7 @@ To send a fire & forget closure to a worker, you can use the `TamerLib\Tamer::do parameter, the closure that will be executed by the worker. ```php -TamerLib\Tamer::doClosure(function(){ +TamerLib\tm::doClosure(function(){ // Do something return 'Hello World'; }); @@ -148,7 +148,7 @@ this is similar to the `queueJob`, see the [Queued Jobs](#queued-jobs) section f You can pass on a Task object to the `doJob` method, and the worker will execute the function in the background ```php - TamerLib\Tamer::doJob(\TamerLib\Objects\Task::create('sleep', 5)); + TamerLib\tm::doJob(\TamerLib\Objects\Task::create('sleep', 5)); ``` > Note: Fire & Forget jobs do not return a result, so you cannot use a callback with them. @@ -160,16 +160,16 @@ Workers are the sub-processes that will handle the jobs sent by the client, they you can also just run simple closures as workers. ```php -TamerLib\Tamer::addWorker('closure', 10); -TamerLib\Tamer::startWorkers(); +TamerLib\tm::addWorker('closure', 10); +TamerLib\tm::startWorkers(); ``` The example above will start 10 closure workers, if you want to run a worker with defined functions you can do so by passing a class name to the `addWorker` method. ```php -TamerLib\Tamer::addWorker(__DIR__ . DIRECTORY_SEPARATOR . 'my_worker', 10); -TamerLib\Tamer::startWorkers(); +TamerLib\tm::addWorker(__DIR__ . DIRECTORY_SEPARATOR . 'my_worker', 10); +TamerLib\tm::startWorkers(); ``` ### Worker Example @@ -184,14 +184,14 @@ looks like this: import('net.nosial.tamerlib', 'latest'); - TamerLib\Tamer::initWorker(); + TamerLib\tm::initWorker(); - TamerLib\Tamer::addFunction('sleep', function(\TamerLib\Objects\Job $job) { + TamerLib\tm::addFunction('sleep', function(\TamerLib\Objects\Job $job) { sleep($job->getData()); return $job->getData(); }); - TamerLib\Tamer::work(); + TamerLib\tm::work(); ``` This example shows how you can define a sleep function which will make the worker sleep for the amount of seconds @@ -208,7 +208,7 @@ until a timeout is reached, and then continue execution, for example: while(true) { // Non-blocking for 500 milliseconds, don't throw errors - TamerLib\Tamer::work(false, 500, false); + TamerLib\tm::work(false, 500, false); // Do other stuff :D } @@ -226,14 +226,14 @@ run it in monitor mode only, this will start the supervisor and the workers, but run your own client on a different server. ```php -TamerLib\Tamer::init(\TamerLib\Abstracts\ProtocolType::Gearman, [ +TamerLib\tm::init(\TamerLib\Abstracts\ProtocolType::Gearman, [ 'host:port', 'host:port' ], $username, $password); -TamerLib\Tamer::addWorker('closure', 10); // Add 10 closure workers -TamerLib\Tamer::addWorker(__DIR__ . DIRECTORY_SEPARATOR . 'my_worker', 10); // Add 10 worker with defined functions -TamerLib\Tamer::startWorkers(); // Start the workers normally -TamerLib\Tamer::monitor(); // Monitor the workers (blocking) +TamerLib\tm::addWorker('closure', 10); // Add 10 closure workers +TamerLib\tm::addWorker(__DIR__ . DIRECTORY_SEPARATOR . 'my_worker', 10); // Add 10 worker with defined functions +TamerLib\tm::startWorkers(); // Start the workers normally +TamerLib\tm::monitor(); // Monitor the workers (blocking) ``` or you can handle it any way you want, just note that the supervisor will shut down the workers if the client is not diff --git a/functions.txt b/functions.txt new file mode 100644 index 0000000..939711b --- /dev/null +++ b/functions.txt @@ -0,0 +1,66 @@ +== GLOBAL == +Both the client and the worker have access to the following functions: + +initialize($mode, $server_config): void - Initializes TamerLib in the given mode, if no server_config is provided then + Tamer will initialize it's own server and use its own configuration. When initializing as a worker, the + $server_config is ignored as the configuration is provided by the parent process. + +shutdown(): void - Shuts down the TamerLib by resetting the global variables and releasing any resources it may have + allocated. + + +== CLIENT == +The client is the master process of TamerLib that is used to push tasks to the worker processes, optionally a client may +also be responsible for supervising workers and or managing its own server process. + +createWorker($count, $path): void - Creates workers and starts them, the count parameter specifies how many workers to create + and the path parameter being optional, if provided, specifies the path to the worker executable. If the path + parameter is not provided, TamerLib will use it's own subprocess to create the workers which can only handle closures + +do($callable, $channel=0): string - Pushes a task to the worker pool and returns the job id, this adds the job to its own watchlist + so that it can be monitored by Tamer. + +call($function, $args, $channel=0): string - Pushes a function call to the worker pool and returns the job id, this adds the job to its own watchlist + so that it can be monitored by Tamer. + +dof($callable, $channel=0): void - Pushes a task to the worker pool and forgets about it, this does not add the job to the watchlist + +callf($function, $args, $channel=0): void - Pushes a function call to the worker pool and forgets about it, + this does not add the job to the watchlist + +wait($callback($job_id, $return)): void - Waits for all jobs dispatched with do() to complete, the callback is called + for each job that completes with the job id and the return value of the job. If one or more jobs fail, the function + will throw an replicated exception. + +waitFor($job_id): mixed - Waits for a specific job to complete and returns the return value of the job, if the job + fails, the function will throw an replicated exception. + +clear(): void - Clears the watchlist of all jobs. + +NOTE: wait & waitFor will execute supervisory tasks while waiting for jobs to complete if the client is configured to + supervise workers. + +== WORKER == +The worker is the slave process of TamerLib that is used to execute tasks pushed to it by the client process. + +setFunction($function, $callable): void - Sets the function that will be called when the worker receives a task with + the given function name. + +removeFunction($function): void - Removes the function that will be called when the worker receives a task with + the given function name. + +getFunctions(): array - Returns an array of all the functions that the worker has registered. + +run($channels, $timeout=0): void - Runs the worker process, this function will block until the worker is shutdown or + until the timeout is reached. If the timeout is reached, the worker will shutdown. The channels parameter is an + int or an array of ints that specifies which channels the worker will listen on. If the channels parameter is 0 + then the worker will listen only on channel 0. + +return($job_id, $return): void - Returns the return value of the job to the client process, this function is called + automatically when the worker completes a job. + +throw($job_id, $exception): void - Throws an exception to the client process, this function is called automatically + when the worker completes a job and the job throws an exception. + +reject($job_id, $exception): void - Rejects a job, this silently rejects the job and pushes it back onto the queue + for another worker to pick up. \ No newline at end of file diff --git a/old b/old new file mode 100644 index 0000000..78896a3 --- /dev/null +++ b/old @@ -0,0 +1,261 @@ + /** + * Configures the Redis server. + * + * @param string $cmd + * @param string $host + * @param int|null $port + * @return void + * @throws NoAvailablePortException + */ + public static function configureRedisServer(string $cmd='redis-server', string $host='127.0.0.1', ?int $port=null): void + { + if(self::$redis_server instanceof RedisServer) + { + throw new RuntimeException('Redis server already configured.'); + } + + self::$redis_server = new RedisServer($cmd, $host, $port); + } + + /** + * Returns the Redis server + * + * @return RedisServer|null + */ + private static function getRedisServer(): ?RedisServer + { + if(is_null(self::$redis_server)) + { + self::$redis_server = new RedisServer(); + } + + return self::$redis_server; + } + + /** + * Adds the job ID to the watch list. + * + * @param int $job_id + * @return void + */ + private static function addToWatch(int $job_id): void + { + self::$watching_jobs[] = $job_id; + } + + /** + * Cleans up the watch list by removing jobs that no longer exist. + * + * @return void + */ + public static function cleanWatchList(): void + { + try + { + foreach(self::$watching_jobs as $job_id) + { + if(!self::getRedisClient()->exists($job_id)) + { + unset(self::$watching_jobs[$job_id]); + } + } + } + catch(Exception $e) + { + throw new RuntimeException('Failed to clean watch list.', 0, $e); + } + } + + /** + * Returns the Redis client. + * + * @return Redis + * @throws RedisException + */ + private static function getRedisClient(): Redis + { + if(is_null(self::$redis_server)) + { + self::$redis_server = new RedisServer(); + } + + if(!self::$redis_server->isRunning()) + { + self::$redis_server->start(); + } + + if(is_null(self::$redis_client)) + { + self::$redis_client = new Redis(); + /** @noinspection NullPointerExceptionInspection */ + var_dump(self::getRedisServer()->getHost(), self::getRedisServer()->getPort()); + self::$redis_client->connect(self::getRedisServer()->getHost(), self::getRedisServer()->getPort()); + } + + return self::$redis_client; + } + + /** + * Returns the worker supervisor. + * + * @return WorkerSupervisor + */ + private static function getSupervisor(): WorkerSupervisor + { + if(is_null(self::$supervisor)) + { + self::$supervisor = new WorkerSupervisor(); + } + + return self::$supervisor; + } + + /** + * Spawns a closure worker. + * + * @param int $count + * @return void + */ + public static function spawnClosure(int $count=8): void + { + self::getSupervisor()->spawnClosure(self::getRedisServer(), $count); + } + + /** + * Spawns a worker of a specific php file. + * + * @param array $cmd + * @param int $count + * @return void + */ + public static function spawn(array $cmd, int $count=8): void + { + self::getSupervisor()->spawnWorker($cmd, self::getRedisServer(), $count); + } + + /** + * Sends a closure to be executed by a worker, returns the job ID for the closure. + * + * @param callable $closure + * @return string + */ + public static function do(callable $closure): string + { + try + { + $closure = serialize(new SerializableClosure($closure)); + } + catch(Exception $e) + { + throw new RuntimeException('Failed to serialize closure.', 2000, $e); + } + + $job_packet = new JobPacket(); + $job_packet->setJobType(JobType::CLOSURE); + $job_packet->setEncodingType(EncodingType::SERIALIZED); + $job_packet->setPayload($closure); + + // Push as hash using toArray() + try + { + self::getRedisClient()->hMSet($job_packet->getId(), $job_packet->toArray()); + self::getRedisClient()->expire($job_packet->getId(), 60); + self::getRedisClient()->rPush(sprintf('ch%s', $job_packet->getChannel()), $job_packet->getId()); + } + catch(Exception $e) + { + throw new RuntimeException('Failed to push job to Redis.', 2000, $e); + } + + self::addToWatch($job_packet->getId()); + return $job_packet->getId(); + } + + /** + * Waits for all jobs to complete. + * + * @param callable|null $callback + * @return void + * @throws RedisException + */ + public static function waitAll(?callable $callback=null): void + { + while(count(self::$watching_jobs) > 0) + { + foreach(self::$watching_jobs as $job_id) + { + if(!self::getRedisClient()->exists($job_id)) + { + unset(self::$watching_jobs[$job_id]); + continue; + } + + if(!in_array(self::getRedisClient()->hGet($job_id, 'status'), JobStatus::PROCESSING_STATES)) + { + if(!is_null($callback)) + { + $callback(self::getRedisClient()->hGet($job_id, 'return_value')); + } + + self::getRedisClient()->del($job_id); + unset(self::$watching_jobs[$job_id]); + } + } + } + } + + /** + * Waits for a job to complete, returns the result. + * + * @param string $job_id + * @param int $timeout + * @return mixed + * @throws RedisException + */ + public function waitFor(string $job_id, int $timeout=0) + { + $timeout_count = time(); + while(in_array(self::getRedisClient()->hGet($job_id, 'status'), JobStatus::PROCESSING_STATES, true)) + { + if($timeout > 0 && time() - $timeout_count > $timeout) + { + throw new RuntimeException(sprintf('waitFor %s timed out.', $job_id)); + } + + usleep(100); + } + + $return = self::getRedisClient()->hGet($job_id, 'result'); + self::getRedisClient()->del($job_id); + + return $return; + } + + /** + * Gets a key from the shared memory. + * + * @param string $key + * @param null $default + * @return mixed + */ + public static function getKey(string $key, $default=null): mixed + { + if(isset(self::$shared_memory[$key])) + { + return self::$shared_memory[$key]; + } + + return $default; + } + + /** + * Sets a key in the shared memory. + * + * @param string $key + * @param mixed $value + * @return void + */ + public static function setKey(string $key, mixed $value): void + { + self::$shared_memory[$key] = $value; + } \ No newline at end of file diff --git a/src/TamerLib/Classes/AdaptiveSleep.php b/src/TamerLib/Classes/AdaptiveSleep.php new file mode 100644 index 0000000..8fcfd93 --- /dev/null +++ b/src/TamerLib/Classes/AdaptiveSleep.php @@ -0,0 +1,77 @@ +max_sleep_time = $max_sleep_time; + $this->busy_buffer = array_fill(0, $buffer_size, false); // Fill the buffer with false values (not busy + $this->buffer_size = $buffer_size; + $this->buffer_index = 0; + } + + /** + * Preforms an adaptive sleep. + * + * @param bool $busy + * @return int + */ + public function sleep(bool $busy): int + { + // Add the busy state to the buffer + $this->busy_buffer[$this->buffer_index] = $busy; + $this->buffer_index = ($this->buffer_index + 1) % $this->buffer_size; // Circular buffer + + // Calculate the average busy state + $busy_count = 0; + foreach($this->busy_buffer as $busy_state) + { + if($busy_state) + { + $busy_count++; + } + } + $busy_average = $busy_count / $this->buffer_size; + + // Calculate the sleep time + $sleep_time = $this->max_sleep_time * (1 - $busy_average); + + // Sleep + if($sleep_time > 0) + { + usleep($sleep_time); + } + + return $sleep_time; + } + } \ No newline at end of file diff --git a/src/TamerLib/Classes/JobManager.php b/src/TamerLib/Classes/JobManager.php new file mode 100644 index 0000000..48eb84d --- /dev/null +++ b/src/TamerLib/Classes/JobManager.php @@ -0,0 +1,628 @@ +server_configuration = $serverConfiguration; + $this->redis_client = new Redis(); + } + + /** + * Attempts to determine if the Redis Server is online. + * + * @param bool $ping + * @return bool + */ + private function isConnected(bool $ping=true): bool + { + if($this->redis_client === null) + { + return false; + } + + try + { + if($ping) + { + $this->redis_client->ping(); + } + } + catch(Exception $e) + { + unset($e); + return false; + } + + return true; + } + + /** + * Attempts to connect to the Redis Server + * + * @return void + * @throws ConnectionException + */ + private function connect(): void + { + // Reconnect every 30 minutes + if ($this->last_connect !== null && $this->last_connect < (time() - 1800)) + { + //$this->disconnect(); + } + + if($this->isConnected()) + { + return; + } + + try + { + $this->redis_client->connect( + $this->server_configuration->getHost(), + $this->server_configuration->getPort() + ); + + if ($this->server_configuration->getPassword() !== null) + { + $this->redis_client->auth($this->server_configuration->getPassword()); + } + + $this->redis_client->select($this->server_configuration->getDatabase()); + $this->last_connect = time(); + } + catch(Exception $e) + { + throw new ConnectionException(sprintf('Could not connect to %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + } + } + + /** + * Disconnects from the Redis Server + * + * @return void + */ + private function disconnect(): void + { + try + { + $this->redis_client->close(); + } + catch(Exception $e) + { + Log::warning('net.nosial.tamerlib',sprintf('JobManager could not disconnect safely from %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()), $e); + } + finally + { + $this->redis_client = null; + } + } + + /** + * Returns the Redis Client, or attempts to connect to the Redis Server if not connected. + * + * @return Redis + * @throws ConnectionException + */ + private function getClient(): Redis + { + $this->connect(); + return $this->redis_client; + } + + /** + * Pushes a JobPacket to the Redis server. + * + * @param JobPacket $jobPacket + * @return void + * @throws ServerException + */ + public function pushJob(JobPacket $jobPacket): void + { + try + { + $this->getClient()->hMSet($jobPacket->getId(), $jobPacket->toArray()); + $this->getClient()->rPush(sprintf('ch%s', $jobPacket->getChannel()), $jobPacket->getId()); + } + catch(Exception $e) + { + throw new ServerException(sprintf('Could not push job to %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + } + } + + /** + * Attempts to get a specific job from the Redis Server. + * + * @param JobPacket|string $job_id + * @return JobPacket + * @throws JobNotFoundException + * @throws ServerException + */ + public function getJob(JobPacket|string $job_id): JobPacket + { + if($job_id instanceof JobPacket) + { + /** @noinspection CallableParameterUseCaseInTypeContextInspection */ + $job_id = $job_id->getId(); + } + + try + { + if(!$this->getClient()->exists($job_id)) + { + throw new JobNotFoundException(sprintf('Job %s does not exist in %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + } + + return new JobPacket($this->getClient()->hGetAll($job_id)); + + } + catch(JobNotFoundException $e) + { + throw $e; + } + catch(Exception $e) + { + throw new ServerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + } + } + + /** + * Returns the status of a job. + * + * @param JobPacket|string $job_id + * @return int + * @throws ServerException + */ + public function getJobStatus(JobPacket|string $job_id): int + { + if($job_id instanceof JobPacket) + { + /** @noinspection CallableParameterUseCaseInTypeContextInspection */ + $job_id = $job_id->getId(); + } + + try + { + if(!$this->getClient()->exists($job_id)) + { + throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + } + + return (int)$this->getClient()->hGet($job_id, 'status'); + } + catch(Exception $e) + { + throw new ServerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + } + } + + /** + * Listens on a return channel for a returned job. + * + * @param string $return_channel + * @param int $timeout + * @return JobPacket + * @throws JobNotFoundException + * @throws ServerException + * @throws TimeoutException + */ + public function listenReturnChannel(string $return_channel, int $timeout=0): JobPacket + { + $time_start = time(); + + try + { + if($timeout < 0) + { + Log::debug('net.nosial.tamerlib', sprintf('Listening for job on return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + $job_packet = $this->getClient()->lPop($return_channel); + } + else + { + Log::debug('net.nosial.tamerlib', sprintf('Listening for job on return channel %s on %s:%s with timeout %s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort(), $timeout)); + $job_packet = $this->getClient()->blPop($return_channel, $timeout); + } + } + catch(Exception $e) + { + throw new TimeoutException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + } + + if($job_packet === null) + { + throw new TimeoutException(sprintf('Could not get job from return channel %s on %s:%s', $return_channel, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + } + + if($job_packet[1] === 'test') + { + return $this->listenReturnChannel($return_channel, $timeout); + } + + Log::debug('net.nosial.tamerlib', sprintf('Received job %s from return channel %ss', $job_packet[1], $return_channel)); + return $this->getJob($job_packet[1]); + } + + /** + * Gets the return value of a job. + * + * @param JobPacket|string $job_id + * @return mixed + * @throws JobNotFoundException + * @throws ServerException + */ + public function getJobResult(JobPacket|string $job_id): mixed + { + if($job_id instanceof JobPacket) + { + /** @noinspection CallableParameterUseCaseInTypeContextInspection */ + $job_id = $job_id->getId(); + } + + try + { + if(!$this->getClient()->exists($job_id)) + { + throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + } + + return $this->getClient()->hGet($job_id, 'return'); + } + catch(JobNotFoundException $e) + { + throw $e; + } + catch(Exception $e) + { + throw new ServerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + } + } + + /** + * Returns the exception to a job. + * + * @param JobPacket|string $job_id + * @return Throwable + * @throws ServerException + */ + public function getJobException(JobPacket|string $job_id): Throwable + { + if($job_id instanceof JobPacket) + { + /** @noinspection CallableParameterUseCaseInTypeContextInspection */ + $job_id = $job_id->getId(); + } + + try + { + if(!$this->getClient()->exists($job_id)) + { + throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + } + + return unserialize($this->getClient()->hGet($job_id, 'exception'), ['allowed_classes'=>true]); + } + catch(Exception $e) + { + throw new ServerException(sprintf('Could not get job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + } + } + + /** + * Attempts to claim an existing JobPacket, returns True if the worker's claim was successful. + * Returns False if the JobPacket does not exist, or if the JobPacket is already claimed. + * + * @param JobPacket|string $job_id + * @param WorkerConfiguration|string $worker_id + * @return bool + * @throws ServerException + */ + public function claimJob(JobPacket|string $job_id, WorkerConfiguration|string $worker_id): bool + { + if($job_id instanceof JobPacket) + { + /** @noinspection CallableParameterUseCaseInTypeContextInspection */ + $job_id = $job_id->getId(); + } + + if($worker_id instanceof WorkerConfiguration) + { + /** @noinspection CallableParameterUseCaseInTypeContextInspection */ + $worker_id = $worker_id->getWorkerId(); + } + + try + { + // Check if the job exists + if(!$this->getClient()->exists($job_id)) + { + return false; + } + + // Attempt to claim the job + $this->getClient()->hSet($job_id, 'worker_id', $worker_id); + // Verify that the job was claimed + if($this->getClient()->hGet($job_id, 'worker_id') !== $worker_id) + { + return false; + } + + // Set the job status to processing + $this->getClient()->hSet($job_id, 'status', JobStatus::PROCESSING); + } + catch(Exception $e) + { + throw new ServerException(sprintf('Could not claim job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + } + + return true; + } + + /** + * Marks a job as finished, and sets the return value. + * + * @param JobPacket|string $job_id + * @param mixed|null $return_value + * @return void + * @throws JobNotFoundException + * @throws ServerException + */ + public function returnJob(JobPacket|string $job_id, mixed $return_value=null): void + { + if($job_id instanceof JobPacket) + { + /** @noinspection CallableParameterUseCaseInTypeContextInspection */ + $job_id = $job_id->getId(); + } + + try + { + if($this->getClient()->exists($job_id)) + { + $return_channel = $this->getClient()->hGet($job_id, 'return_channel'); + Log::debug('net.nosial.tamerlib', sprintf('Returning job %s (Return Channel: %s)', $job_id, $return_channel ?? 'n/a')); + + if($return_channel === null) + { + $this->getClient()->del($job_id); + return; + } + + $this->getClient()->hSet($job_id, 'return_value', serialize($return_value)); + $this->getClient()->hSet($job_id, 'status', JobStatus::FINISHED); + $this->getClient()->rPush($this->getClient()->hGet($job_id, 'return_channel'), $job_id); + return; + } + } + catch(Exception $e) + { + throw new ServerException(sprintf('Could not return job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + } + + throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + } + + /** + * Rejects a job and pushes it back to the channel. + * + * @param JobPacket|string $job_id + * @return void + * @throws JobNotFoundException + * @throws ServerException + */ + public function rejectJob(JobPacket|string $job_id): void + { + $channel_id = null; + + if($job_id instanceof JobPacket) + { + // Providing a JobPacket allows us to get the channel_id and avoid an extra call to the server + $channel_id = $job_id->getChannel(); + /** @noinspection CallableParameterUseCaseInTypeContextInspection */ + $job_id = $job_id->getId(); + } + + try + { + if($this->getClient()->exists($job_id)) + { + Log::debug('net.nosial.tamerlib', sprintf('Rejecting job %s', $job_id)); + + // Mark as rejected, clear worker_id + $this->getClient()->hSet($job_id, 'worker_id', null); + // Push back to the channel + if($channel_id !== null) + { + $channel_id = $this->getClient()->hGet($job_id, 'channel'); + } + $this->getClient()->rPush(sprintf('ch%s', $channel_id), $job_id); + + return; + } + } + catch(Exception $e) + { + throw new ServerException(sprintf('Could not reject job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + } + + throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + } + + /** + * Sets the job as failed, and sets the exception that was thrown. + * + * @param JobPacket|string $job_id + * @param Throwable $throwable + * @return void + * @throws ServerException + */ + public function returnException(JobPacket|string $job_id, Throwable $throwable): void + { + if($job_id instanceof JobPacket) + { + /** @noinspection CallableParameterUseCaseInTypeContextInspection */ + $job_id = $job_id->getId(); + } + + try + { + if($this->getClient()->exists($job_id)) + { + Log::debug('net.nosial.tamerlib', sprintf('Returning exception for job %s', $job_id)); + + if($this->getClient()->hGet($job_id, 'return_channel') === null) + { + $this->getClient()->del($job_id); + return; + } + + $this->getClient()->hSet($job_id, 'exception', serialize($throwable)); + $this->getClient()->hSet($job_id, 'status', JobStatus::FAILED); + $this->getClient()->rPush($this->getClient()->hGet($job_id, 'return_channel'), $job_id); + Log::debug('net.nosial.tamerlib', sprintf('Pushed job %s to return channel %s', $job_id, $this->getClient()->hGet($job_id, 'return_channel'))); + return; + } + } + catch(Exception $e) + { + throw new ServerException(sprintf('Could not return exception for job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + } + } + + /** + * Waits for a job to be available on the channel or multiple channels, attempts to claim it, and returns it. + * + * If multiple channels are selected, the function will iterate through each channel until a job is found. + * + * @param WorkerConfiguration|string $worker_id The worker that is waiting for the job. + * @param int|array $channel The channel (or channels) to wait for the job on. + * @param int $timeout The timeout in seconds to wait for the job. + * @return JobPacket The job that was claimed, or null if no job was available. + * @throws ServerException + * @throws TimeoutException + */ + public function listenForJob(WorkerConfiguration|string $worker_id, int|array $channel=0, int $timeout=0): JobPacket + { + if($worker_id instanceof WorkerConfiguration) + { + /** @noinspection CallableParameterUseCaseInTypeContextInspection */ + $worker_id = $worker_id->getWorkerId(); + } + + $channels = []; + if(is_array($channel)) + { + foreach($channel as $id) + { + $channels[] = sprintf('ch%s', $id); + } + } + else + { + $channels[] = sprintf('ch%s', $channel); + } + + try + { + while(true) + { + if($timeout < 0) + { + $job_id = $this->getClient()->blPop($channels, 1); + } + else + { + $job_id = $this->getClient()->blPop($channels, $timeout); + } + + if($job_id !== false && $this->claimJob($job_id[1], $worker_id)) + { + return new JobPacket($this->getClient()->hGetAll($job_id[1])); + } + + if($timeout < 0) + { + throw new TimeoutException(sprintf('Timeout exceeded while waiting for job on %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort())); + } + + if($timeout === 0 && $job_id === null) + { + throw new TimeoutException(sprintf('Timeout exceeded while waiting for job on %s:%s', $this->server_configuration->getHost(), $this->server_configuration->getPort())); + } + } + + } + catch(Exception $e) + { + throw new ServerException(sprintf('Could not wait for job on channel %s from %s:%s', $channel, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + } + } + + /** + * Drops a job from the server entirely. + * + * @param JobPacket|string $job_id + * @return void + * @throws JobNotFoundException + * @throws ServerException + */ + public function dropJob(JobPacket|string $job_id): void + { + if($job_id instanceof JobPacket) + { + /** @noinspection CallableParameterUseCaseInTypeContextInspection */ + $job_id = $job_id->getId(); + } + + try + { + if($this->getClient()->exists($job_id)) + { + $this->getClient()->del($job_id); + return; + } + } + catch(Exception $e) + { + throw new ServerException(sprintf('Could not drop job %s from %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort()), 0, $e); + } + + throw new JobNotFoundException(sprintf('Job %s does not exist on %s:%s', $job_id, $this->server_configuration->getHost(), $this->server_configuration->getPort())); + } + } \ No newline at end of file diff --git a/src/TamerLib/Classes/RedisServer.php b/src/TamerLib/Classes/RedisServer.php index cc6f2d4..941f1ba 100644 --- a/src/TamerLib/Classes/RedisServer.php +++ b/src/TamerLib/Classes/RedisServer.php @@ -8,8 +8,8 @@ use Redis; use RedisException; use Symfony\Component\Process\Process; - use TamerLib\Exceptions\NoAvailablePortException; - use TamerLib\Exceptions\RedisServerException; + use TamerLib\Exceptions\ServerException; + use TamerLib\Objects\ServerConfiguration; class RedisServer { @@ -19,14 +19,11 @@ private $cmd; /** - * @var string + * The server configuration + * + * @var ServerConfiguration */ - private $host; - - /** - * @var int|null - */ - private $port; + private $configuration; /** * @var Process|null @@ -36,33 +33,15 @@ /** * RedisServer constructor. * + * @param ServerConfiguration $configuration * @param string $cmd - * @param string $host - * @param int|null $port - * @throws NoAvailablePortException */ - public function __construct(string $cmd='redis-server', string $host='127.0.0.1', ?int $port=null) + public function __construct(ServerConfiguration $configuration, string $cmd='redis-server') { - if (is_null($port)) - { - $port = Utilities::getAvailablePort(); - Log::debug('net.nosial.tamerlib', 'Selected port ' . $port . '.'); - } - + $this->configuration = $configuration; $this->cmd = $cmd; - $this->host = $host; - $this->port = $port; } - /** - * Returns the port that the Redis server is listening on. - * - * @return int|null - */ - public function getPort(): ?int - { - return $this->port; - } /** * Determines if the Redis server is running. @@ -84,8 +63,7 @@ * * @param int $timeout * @return bool - * @throws RedisServerException - * @throws RedisException + * @throws ServerException */ public function start(int $timeout=60): bool { @@ -94,28 +72,34 @@ return true; } - Log::verbose('net.nosial.tamerlib', 'Starting Redis server on port ' . $this->port . '.'); - $this->server_process = new Process([$this->cmd, '--port', $this->port]); + Log::verbose('net.nosial.tamerlib', 'Starting server on port ' . $this->configuration->getPort() . '.'); + $this->server_process = new Process([$this->cmd, '--port', $this->configuration->getPort()]); $this->server_process->start(); // Use a redis client and ping the server until it responds. $redis_client = new Redis(); $timeout_counter = 0; - while(!$redis_client->isConnected()) + while(true) { if($timeout_counter >= $timeout) { - throw new RedisServerException('Redis server failed to start within ' . $timeout . ' seconds.'); + throw new ServerException('Redis server failed to start within ' . $timeout . ' seconds.'); } try { - $redis_client->connect($this->host, $this->port); + if($redis_client->isConnected()) + { + break; + } + + $redis_client->connect($this->configuration->getHost(), $this->configuration->getPort()); } catch (RedisException $e) { // Do nothing. + unset($e); } finally { @@ -124,7 +108,7 @@ } } - Log::verbose('net.nosial.tamerlib', 'Redis server started.'); + Log::verbose('net.nosial.tamerlib', sprintf('Server listening on %s:%s.', $this->configuration->getHost(), $this->configuration->getPort())); return true; } @@ -141,7 +125,7 @@ } $this->server_process->stop(); - Log::verbose('net.nosial.tamerlib', 'Redis server stopped.'); + Log::verbose('net.nosial.tamerlib', sprintf('Server stopped on %s:%s.', $this->configuration->getHost(), $this->configuration->getPort())); return true; } diff --git a/src/TamerLib/Classes/Utilities.php b/src/TamerLib/Classes/Utilities.php index a2128a7..f3cc29f 100644 --- a/src/TamerLib/Classes/Utilities.php +++ b/src/TamerLib/Classes/Utilities.php @@ -2,7 +2,13 @@ namespace TamerLib\Classes; + use Exception; + use Opis\Closure\SerializableClosure; use TamerLib\Exceptions\NoAvailablePortException; + use Throwable; + use function serialize; + use function unserialize; + use const PHP_MAJOR_VERSION; class Utilities { @@ -35,4 +41,78 @@ throw new NoAvailablePortException('No available port found in range ' . $start . ' to ' . $end . '.'); } + + /** + * Returns a randomly generated string of the given length. + * + * @param int $length + * @return string + * @throws Exception + */ + public static function generateRandomString(int $length=8): string + { + $characters = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'; + $characters_length = strlen($characters); + $random_string = ''; + for ($i = 0; $i < $length; $i++) + { + $random_string .= $characters[random_int(0, $characters_length - 1)]; + } + return $random_string; + } + + /** + * Returns an array representation of a throwable exception. + * + * @param Throwable $throwable + * @param bool $recursive + * @return array + */ + public static function throwableToArray(Throwable $throwable, bool $recursive=true): array + { + $array = [ + 'message' => $throwable->getMessage(), + 'code' => $throwable->getCode(), + 'file' => $throwable->getFile(), + 'line' => $throwable->getLine(), + 'trace' => $throwable->getTrace(), + ]; + + if($recursive && $throwable->getPrevious() instanceof Throwable) + { + $array['previous'] = self::throwableToArray($throwable->getPrevious()); + } + + return $array; + } + + /** + * Serialize + * + * @param mixed $data + * @return string + */ + public static function serialize($data) + { + SerializableClosure::enterContext(); + SerializableClosure::wrapClosures($data); + $data = serialize($data); + SerializableClosure::exitContext(); + return $data; + } + /** + * Unserialize + * + * @param string $data + * @param array|null $options + * @return mixed + */ + public static function unserialize($data, array $options = null) + { + SerializableClosure::enterContext(); + $data = $options === null || PHP_MAJOR_VERSION < 7 ? unserialize($data) : unserialize($data, $options); + SerializableClosure::unwrapClosures($data); + SerializableClosure::exitContext(); + return $data; + } } \ No newline at end of file diff --git a/src/TamerLib/Classes/WorkerSupervisor.php b/src/TamerLib/Classes/WorkerSupervisor.php new file mode 100644 index 0000000..dd14b47 --- /dev/null +++ b/src/TamerLib/Classes/WorkerSupervisor.php @@ -0,0 +1,177 @@ +workers = []; + $this->configuration = $configuration; + } + + /** + * Generates a worker configuration object. + * + * @param int $channel + * @return WorkerConfiguration + */ + private function generateWorkerConfiguration(int $channel=0): WorkerConfiguration + { + $configuration = new WorkerConfiguration(); + $configuration->setHost($this->configuration->getHost()); + $configuration->setPort($this->configuration->getPort()); + $configuration->setPassword($this->configuration->getPassword()); + $configuration->setDatabase($this->configuration->getDatabase()); + $configuration->setChannel($channel); + + // TODO: Pass on database and password + return $configuration; + } + + /** + * @param string $path + * @param int $count + * @param int $channel + * @return void + */ + public function spawnWorker(string $path, int $count=8, int $channel=0, bool $check=true): void + { + if(!file_exists($path) || !is_file($path)) + { + throw new InvalidArgumentException(sprintf('Path %s does not exist', $path)); + } + + for($i = 0; $i < $count; $i++) + { + $worker_config = $this->generateWorkerConfiguration($channel); + Log::debug('net.nosial.tamerlib', sprintf('Spawning worker %s (%s)', $worker_config->getWorkerId(), $path)); + + $worker = new WorkerInstance($worker_config, $path); + $this->workers[$worker_config->getWorkerId()] = $worker; + $worker->start(); + } + + if($check) + { + // TODO: Check if workers are running + } + + $this->printUpdates(); + } + + /** + * Spawns a closure as a worker process. (built-in worker) + * + * @param int $count + * @param int $channel + * @return void + */ + public function spawnClosure(int $count=8, int $channel=0): void + { + if(!file_exists(__DIR__ . DIRECTORY_SEPARATOR . 'subproc')) + { + throw new RuntimeException(sprintf('subproc file does not exist, checked %s', __DIR__)); + } + + $this->spawnWorker(__DIR__ . DIRECTORY_SEPARATOR . 'subproc', $count, $channel); + } + + /** + * @return void + */ + public function printUpdates(): void + { + /** @var WorkerInstance $worker */ + foreach($this->workers as $worker) + { + print($worker->getOutput()); + } + } + + /** + * Monitors the worker processes and restarts them if they are not running. + * + * @param int $timeout + * @return void + */ + public function monitor(int $timeout=0): void + { + $start_time = time(); + + while(true) + { + /** @var WorkerInstance $worker */ + foreach($this->workers as $worker) + { + print($worker->getOutput()); + + + //if(!$worker->isRunning()) + //{ + // print($worker->getOutput()); + // Log::warning('net.nosial.tamerlib', sprintf('Worker %s is not running, killing', $worker->getConfiguration()->getWorkerId())); + // $worker->stop(); + // unset($this->workers[$worker->getConfiguration()->getWorkerId()]); + // $this->spawnWorker($worker->getPath(), 1, $worker->getConfiguration()->getChannel(), false); + //} + } + + if($timeout < 0) + { + return; + } + + if($timeout > 0 && time() - $start_time > $timeout) + { + return; + } + } + + } + + /** + * Stops all worker processes and removes them from the supervisor. + * + * @return void + */ + public function stopAll(): void + { + Log::debug('net.nosial.tamerlib', 'Stopping all workers'); + foreach($this->workers as $worker_id => $worker) + { + $worker->stop(); + unset($this->workers[$worker_id]); + } + } + + /** + * Public Destructor + */ + public function __destruct() + { + $this->stopAll(); + } + } \ No newline at end of file diff --git a/src/TamerLib/Classes/subproc b/src/TamerLib/Classes/subproc new file mode 100644 index 0000000..efdae81 --- /dev/null +++ b/src/TamerLib/Classes/subproc @@ -0,0 +1,28 @@ +getMessage(), $e); + } + } \ No newline at end of file diff --git a/src/TamerLib/Enums/EncodingType.php b/src/TamerLib/Enums/EncodingType.php new file mode 100644 index 0000000..0027963 --- /dev/null +++ b/src/TamerLib/Enums/EncodingType.php @@ -0,0 +1,11 @@ +id = $data['id'] ?? random_int(1000000000, 9999999999); + $this->channel = $data['channel'] ?? 0; + $this->job_type = $data['job_type'] ?? JobType::CLOSURE; + $this->status = $data['status'] ?? JobStatus::WAITING; + $this->worker_id = $data['worker_id'] ?? null; + $this->parameters = $data['parameters'] ?? null; + $this->payload = $data['payload'] ?? null; + $this->return_value = $data['return_value'] ?? null; + $this->exception = $data['exception'] ?? null; + $this->return_channel = $data['return_channel'] ?? null; + $this->created_at = $data['created_at'] ?? time(); + } + + /** + * Returns the JobPacket's ID + * + * @return int + */ + public function getId(): int + { + return (int)$this->id; + } + + /** + * Sets the ID of the JobPacket, if null is passed, a random ID will be generated + * + * @param int|null $id + * @throws Exception + */ + public function setId(?int $id=null): void + { + if($id !== null) + { + $this->id = random_int(1000000000, 9999999999); + return; + } + + $this->id = (int)$id; + } + + /** + * Returns the channel the JobPacket is assigned to + * + * @return int + */ + public function getChannel(): int + { + return (int)$this->channel; + } + + /** + * Sets the channel the JobPacket is assigned to, if null is passed, the channel will be set to 0 + * 0 is the default channel and is used for all jobs that are not assigned to a specific channel + * + * @param int|null $channel + * @noinspection PhpCastIsUnnecessaryInspection + * @noinspection UnnecessaryCastingInspection + */ + public function setChannel(?int $channel=null): void + { + if($channel === null) + { + $this->channel = 0; + return; + } + + $this->channel = (int)$channel; + } + + /** + * Returns the Job Type of the JobPacket + * + * @return int + */ + public function getJobType(): int + { + return (int)$this->job_type; + } + + /** + * @param int $job_type + */ + public function setJobType(int $job_type): void + { + $this->job_type = $job_type; + } + + /** + * @return int + */ + public function getStatus(): int + { + return (int)$this->status; + } + + /** + * @param int $status + */ + public function setStatus(int $status): void + { + $this->status = (int)$status; + } + + /** + * @return string|null + */ + public function getWorkerId(): ?string + { + return (string)$this->worker_id; + } + + /** + * @param string|null $worker_id + */ + public function setWorkerId(?string $worker_id): void + { + $this->worker_id = $worker_id; + } + + /** + * @return mixed|null + */ + public function getParameters(): mixed + { + return $this->parameters; + } + + /** + * @param mixed|null $parameters + */ + public function setParameters(mixed $parameters): void + { + $this->parameters = $parameters; + } + + /** + * @return mixed|null + */ + public function getPayload(): mixed + { + return $this->payload; + } + + /** + * @param mixed|null $payload + */ + public function setPayload(mixed $payload): void + { + $this->payload = $payload; + } + + /** + * @return mixed|null + */ + public function getReturnValue(): mixed + { + return $this->return_value; + } + + /** + * @param mixed|null $return_value + */ + public function setReturnValue(mixed $return_value): void + { + $this->return_value = $return_value; + } + + /** + * @return string|null + */ + public function getException(): ?string + { + return $this->exception; + } + + /** + * @param string|null $exception + */ + public function setException(?string $exception): void + { + $this->exception = $exception; + } + + /** + * @return string|null + */ + public function getReturnChannel(): ?string + { + return $this->return_channel; + } + + /** + * @param string|null $return_channel + */ + public function setReturnChannel(?string $return_channel): void + { + $this->return_channel = $return_channel; + } + + + /** + * @return int|mixed + */ + public function getCreatedAt(): mixed + { + return $this->created_at; + } + + /** + * @param int|mixed $created_at + */ + public function setCreatedAt(mixed $created_at): void + { + $this->created_at = $created_at; + } + + /** + * Returns an array representation of the JobPacket + * + * @return array + */ + public function toArray(): array + { + return [ + 'id' => $this->getId(), + 'channel' => $this->getChannel(), + 'job_type' => $this->getJobType(), + 'status' => $this->getStatus(), + 'worker_id' => $this->getWorkerId(), + 'parameters' => $this->getParameters(), + 'payload' => $this->getPayload(), + 'return_value' => $this->getReturnValue(), + 'exception' => $this->getException(), + 'return_channel' => $this->getReturnChannel(), + 'created_at' => $this->getCreatedAt() + ]; + } + } \ No newline at end of file diff --git a/src/TamerLib/Objects/ServerConfiguration.php b/src/TamerLib/Objects/ServerConfiguration.php new file mode 100644 index 0000000..1aa7abb --- /dev/null +++ b/src/TamerLib/Objects/ServerConfiguration.php @@ -0,0 +1,112 @@ +host = $host; + $this->port = $port; + $this->password = $password; + $this->database = $database; + + if(is_null($port)) + { + try + { + $port = Utilities::getAvailablePort($host); + } + catch(Exception $e) + { + Log::warning('net.nosial.tamerlib', 'No available port found. Using random port.'); + + try + { + $port = random_int(1024, 65535); + } + catch(Exception $e) + { + throw new RuntimeException('Could not generate random port.', 0, $e); + } + } + finally + { + Log::debug('net.nosial.tamerlib', 'Selected port ' . $port . '.'); + } + } + + if(!is_null($port)) + { + $this->port = $port; + } + } + + /** + * @return string + */ + public function getHost(): string + { + return $this->host; + } + + /** + * @return int + */ + public function getPort(): int + { + return $this->port; + } + + /** + * @return string|null + */ + public function getPassword(): ?string + { + return $this->password; + } + + /** + * @return int + */ + public function getDatabase(): int + { + return $this->database; + } + } \ No newline at end of file diff --git a/src/TamerLib/Objects/WorkerConfiguration.php b/src/TamerLib/Objects/WorkerConfiguration.php new file mode 100644 index 0000000..6834ad8 --- /dev/null +++ b/src/TamerLib/Objects/WorkerConfiguration.php @@ -0,0 +1,196 @@ +worker_id = $data['worker_id'] ?? Utilities::generateRandomString(); + $this->host = $data['host'] ?? '127.0.0.1'; + $this->port = $data['port'] ?? null; + $this->password = $data['password'] ?? null; + $this->database = $data['database'] ?? 0; + $this->channel = $data['channel'] ?? 0; + } + + /** + * Constructs object from environment variables. + * + * @return WorkerConfiguration + * @throws Exception + */ + public static function fromEnvironment(): WorkerConfiguration + { + if(getenv('TAMER_WORKER') !== 'true') + { + throw new RuntimeException('Process is not running as a worker.'); + } + + $data = [ + 'worker_id' => getenv('TAMER_WORKER_ID'), + 'host' => getenv('TAMER_WORKER_HOST'), + 'port' => getenv('TAMER_WORKER_PORT'), + 'password' => getenv('TAMER_WORKER_PASSWORD'), + 'database' => getenv('TAMER_WORKER_DATABASE'), + 'channel' => getenv('TAMER_WORKER_CHANNEL'), + ]; + + return new WorkerConfiguration($data); + } + + /** + * @return string + */ + public function getWorkerId(): string + { + return $this->worker_id; + } + + /** + * @param string $worker_id + */ + public function setWorkerId(string $worker_id): void + { + $this->worker_id = $worker_id; + } + + /** + * @return string + */ + public function getHost(): string + { + return $this->host; + } + + /** + * @param string $host + */ + public function setHost(string $host): void + { + $this->host = $host; + } + + /** + * @return int + */ + public function getPort(): int + { + return $this->port; + } + + /** + * @param int $port + */ + public function setPort(int $port): void + { + $this->port = $port; + } + + /** + * @return string|null + */ + public function getPassword(): ?string + { + return $this->password; + } + + public function setPassword(?string $password): void + { + $this->password = $password; + } + + /** + * @return int|null + */ + public function getDatabase(): ?int + { + return $this->database; + } + + /** + * @param int|null $database + * @return void + */ + public function setDatabase(?int $database): void + { + $this->database = $database; + } + + /** + * @return int + */ + public function getChannel(): int + { + return $this->channel; + } + + /** + * @param int $channel + * @return void + */ + public function setChannel(int $channel=0): void + { + $this->channel = $channel; + } + + /** + * Outputs environment variables for worker. + * + * @return array + */ + public function toEnvironment(): array + { + return [ + 'TAMER_WORKER' => 'true', + 'TAMER_WORKER_ID' => $this->getWorkerId(), + 'TAMER_WORKER_HOST' => $this->getHost(), + 'TAMER_WORKER_PORT' => $this->getPort(), + 'TAMER_WORKER_PASSWORD' => $this->getPassword(), + 'TAMER_WORKER_DATABASE' => $this->getDatabase(), + 'TAMER_WORKER_CHANNEL' => $this->getChannel(), + ]; + } + } \ No newline at end of file diff --git a/src/TamerLib/Objects/WorkerInstance.php b/src/TamerLib/Objects/WorkerInstance.php new file mode 100644 index 0000000..bb019c8 --- /dev/null +++ b/src/TamerLib/Objects/WorkerInstance.php @@ -0,0 +1,187 @@ +configuration = $configuration; + $this->path = $path; + } + + /** + * Determines if the worker is running. + * + * @return bool + */ + public function isRunning(): bool + { + if(is_null($this->process)) + { + return false; + } + + return $this->process->isRunning(); + } + + /** + * Terminates the worker process. + * + * @return void + */ + public function stop(): void + { + if(is_null($this->process)) + { + return; + } + + try + { + $this->process->stop(); + } + catch(Exception $e) + { + Log::warning('net.nosial.tamerlib', sprintf('Failed to stop worker %s', $this->configuration->getWorkerId()), $e); + } + finally + { + $this->process = null; + Log::debug('net.nosial.tamerlib', sprintf('Stopped worker %s', $this->configuration->getWorkerId())); + } + } + + /** + * Starts the process for the worker. + * + * @return void + */ + public function start(): void + { + if($this->isRunning()) + { + return; + } + + $php_bin = (new PhpExecutableFinder())->find(); + $process = new Process([$php_bin, $this->path]); + $process->setEnv($this->configuration->toEnvironment()); + + try + { + $process->start(); + } + catch(Exception $e) + { + Log::warning('net.nosial.tamerlib', sprintf('Failed to start worker %s', $this->configuration->getWorkerId()), $e); + return; + } + finally + { + $this->process = $process; + Log::debug('net.nosial.tamerlib', sprintf('Started worker %s', $this->configuration->getWorkerId())); + } + } + + /** + * Returns the last output from the worker. + * + * @return string|null + */ + public function getOutput(): ?string + { + if(is_null($this->process)) + { + return ''; + } + + $output = $this->process->getIncrementalOutput(); + return empty($output) ? null : $output; + } + + /** + * Monitors the worker for a given amount of time, or indefinitely if no timeout is given. + * Throws an exception if the worker is not running. + * Outputs the worker's output to the console. + * + * @param int $timeout + * @return void + */ + public function monitor(int $timeout=0): void + { + $time_start = time(); + while(true) + { + if(!$this->isRunning()) + { + throw new RuntimeException(sprintf('Worker %s is not running', $this->configuration->getWorkerId())); + } + + $output = $this->getOutput(); + if(!is_null($output)) + { + print($output); + } + + if($timeout > 0 && (time() - $time_start) > $timeout) + { + break; + } + } + } + + /** + * @return Process|null + */ + public function getProcess(): ?Process + { + return $this->process; + } + + /** + * @return WorkerConfiguration + */ + public function getConfiguration(): WorkerConfiguration + { + return $this->configuration; + } + + /** + * @return string|null + */ + public function getPath(): ?string + { + return $this->path; + } + } \ No newline at end of file diff --git a/src/TamerLib/Tamer.php b/src/TamerLib/Tamer.php deleted file mode 100644 index 11d82f3..0000000 --- a/src/TamerLib/Tamer.php +++ /dev/null @@ -1,11 +0,0 @@ -start(); + + // Register shutdown function to stop the server when the process exits + register_shutdown_function(static function() + { + self::$server?->stop(); + }); + } + catch(Exception $e) + { + throw new ServerException('Failed to initialize the server.', 0, $e); + } + + } + + if($mode === TamerMode::WORKER) + { + try + { + self::$worker_configuration = WorkerConfiguration::fromEnvironment(); + self::$server_configuration = new ServerConfiguration(self::$worker_configuration->getHost(), self::$worker_configuration->getPort(), self::$worker_configuration->getPassword()); + } + catch(Exception $e) + { + throw new RuntimeException('Failed to initialize worker configuration. (is the process running as a worker?)', 0, $e); + } + } + + if($mode === TamerMode::CLIENT) + { + self::$supervisor = new WorkerSupervisor(self::$server_configuration); + self::$return_channel = 'rch' . random_int(100000000, 999999999); + } + + self::$job_manager = new JobManager(self::$server_configuration); + } + + /** + * Shuts down all workers + * + * @return void + */ + public static function shutdown(): void + { + if(self::$mode === null) + { + return; + } + + if(self::$mode === TamerMode::CLIENT && self::$supervisor !== null) + { + self::$supervisor->stopAll(); + } + } + + /** + * CLIENT FUNCTIONS + */ + + /** + * Spawns a worker process by their count, if the path is null then a generic sub process will be spawned + * that will only be capable of executing closures. + * + * @param int $count + * @param string|null $path + * @param int $channel + * @return void + */ + public static function createWorker(int $count=8, ?string $path=null, int $channel=0): void + { + if(self::$mode !== TamerMode::CLIENT) + { + throw new RuntimeException(sprintf('Attempting to spawn a worker in \'%s\' mode, only clients can spawn workers.', self::$mode)); + } + + if($path === null) + { + self::$supervisor->spawnClosure($count, $channel); + } + else + { + self::$supervisor->spawnWorker($path, $count, $channel); + } + } + + /** + * Preforms a job in the background, returns the Job ID to keep track of the job status. + * + * @param callable $function + * @param array $arguments + * @param int $channel + * @return string + */ + public static function do(callable $function, array $arguments, int $channel=0): string + { + if(self::$mode !== TamerMode::CLIENT) + { + throw new RuntimeException(sprintf('Attempting to do() in \'%s\' mode, only clients can preform do().', self::$mode)); + } + + $job_packet = new JobPacket(); + $job_packet->setJobType(JobType::CLOSURE); + $job_packet->setParameters(serialize($arguments)); + $job_packet->setPayload(serialize(new SerializableClosure($function))); + $job_packet->setChannel($channel); + $job_packet->setReturnChannel(self::$return_channel); + + + try + { + self::$job_manager->pushJob($job_packet); + } + catcH(Exception $e) + { + throw new RuntimeException('do() failed, failed to push job to the server', 0, $e); + } + + self::addToWatchlist($job_packet->getId()); + return $job_packet->getId(); + } + + /** + * Preforms a function call against a worker in the background, returns the Job ID to keep track of the job status. + * + * @param string $function + * @param array $arguments + * @param int $channel + * @return mixed + */ + public static function call(string $function, array $arguments, int $channel=0): mixed + { + if(self::$mode !== TamerMode::CLIENT) + { + throw new RuntimeException(sprintf('Attempting to call() in \'%s\' mode, only clients can preform call().', self::$mode)); + } + + $job_packet = new JobPacket(); + $job_packet->setJobType(JobType::FUNCTION); + $job_packet->setParameters(serialize($arguments)); + $job_packet->setPayload($function); + $job_packet->setChannel($channel); + $job_packet->setReturnChannel(self::$return_channel); + + try + { + self::$job_manager->pushJob($job_packet); + } + catcH(Exception $e) + { + throw new RuntimeException('call() failed, failed to push job to the server', 0, $e); + } + + self::addToWatchlist($job_packet->getId()); + return $job_packet->getId(); + } + + /** + * Does a job in the background, but once the job is completed it will be forgotten and the result will not be + * returned, this also means that the job will not be added to the watchlist. + * + * @param callable $function + * @param array $arguments + * @param int $channel + * @return void + */ + public function dof(callable $function, array $arguments, int $channel=0): void + { + if(self::$mode !== TamerMode::CLIENT) + { + throw new RuntimeException(sprintf('Attempting to dof() in \'%s\' mode, only clients can preform dof().', self::$mode)); + } + + $job_packet = new JobPacket(); + $job_packet->setJobType(JobType::CLOSURE); + $job_packet->setParameters(serialize($arguments)); + $job_packet->setPayload(serialize(new SerializableClosure($function))); + $job_packet->setChannel($channel); + + try + { + self::$job_manager->pushJob($job_packet); + } + catcH(Exception $e) + { + throw new RuntimeException('do() failed, failed to push job to the server', 0, $e); + } + + self::addToWatchlist($job_packet->getId()); + } + + /** + * Sends a function call to a worker in the background, but once the job is completed it will be forgotten and + * the result will not be returned, this also means that the job will not be added to the watchlist. + * + * @param string $function + * @param array $arguments + * @param int $channel + * @return void + */ + public static function callf(string $function, array $arguments, int $channel=0): void + { + if(self::$mode !== TamerMode::CLIENT) + { + throw new RuntimeException(sprintf('Attempting to callf() in \'%s\' mode, only clients can preform callf().', self::$mode)); + } + + $job_packet = new JobPacket(); + $job_packet->setJobType(JobType::FUNCTION); + $job_packet->setParameters(serialize($arguments)); + $job_packet->setForget(true); + $job_packet->setPayload($function); + $job_packet->setChannel($channel); + + try + { + self::$job_manager->pushJob($job_packet); + } + catcH(Exception $e) + { + throw new RuntimeException('callf() failed, failed to push job to the server', 0, $e); + } + + self::addToWatchlist($job_packet->getId()); + } + + /** + * Waits for all the dispatched jobs to complete, this is a blocking function and will not return until all the + * jobs have completed. If a timeout is specified, the function will return after the timeout has been reached. + * + * @param callable $callback + * @param int $timeout + * @return void + * @throws ServerException + * @throws Throwable + * @throws TimeoutException + */ + public static function wait(callable $callback, int $timeout=0): void + { + if(self::$mode !== TamerMode::CLIENT) + { + throw new RuntimeException(sprintf('Attempting to wait() in \'%s\' mode, only clients can preform wait().', self::$mode)); + } + + $time_start = time(); + while(true) + { + if(count(self::$watching_jobs) === 0) + { + Log::debug('net.nosial.tamerlib', 'No jobs to wait for, returning'); + return; + } + + Log::debug('net.nosial.tamerlib', 'Waiting for jobs to complete'); + $job_packet = self::$job_manager->listenReturnChannel(self::$return_channel); + + if(in_array($job_packet->getId(), self::$watching_jobs)) + { + Log::debug('net.nosial.tamerlib', sprintf('Job \'%s\' has returned, removing from watchlist', $job_packet->getId())); + + self::removeFromWatchlist($job_packet->getId()); + self::$job_manager->dropJob($job_packet->getId()); + + if($job_packet->getStatus() === JobStatus::FINISHED) + { + $return_value = $job_packet->getReturnValue(); + + if($return_value === null) + { + $return_value = null; + } + else + { + $return_value = unserialize($return_value, ['allowed_classes' => true]); + + if($return_value === false) + { + Log::error('net.nosial.tamerlib', 'Failed to unserialize return value, return value was dropped'); + $return_value = null; + } + } + + $callback($job_packet->getId(), $return_value); + } + elseif($job_packet->getStatus() === JobStatus::FAILED) + { + try + { + $e = unserialize($job_packet->getException(), ['allowed_classes' => true]); + } + catch(Exception $e) + { + Log::error('net.nosial.tamerlib', 'Failed to unserialize exception, exception was dropped', $e); + } + finally + { + if(isset($e) && $e instanceof Throwable) + { + throw $e; + } + + throw new ServerException('wait() failed, job returned with an exception'); + } + } + else + { + Log::debug('net.nosial.tamerlib', sprintf('Job \'%s\' returned with an unexpected status of \'%s\'', $job_packet->getId(), $job_packet->getStatus())); + throw new ServerException('wait() failed, job returned with an unexpected status of \'' . $job_packet->getStatus() . '\''); + } + } + else + { + Log::debug('net.nosial.tamerlib', sprintf('Job \'%s\' has returned, but is not in the watchlist', $job_packet->getId())); + } + + if ($timeout < 0) + { + throw new TimeoutException('wait() timed out'); + } + + if($timeout > 0 && (time() - $time_start) >= $timeout) + { + throw new TimeoutException('wait() timed out'); + } + + usleep(10); + } + } + + /** + * Waits for a job to complete, returns the result of the job. + * + * @param JobPacket|int $job_id + * @param int $timeout + * @return mixed + * @throws JobNotFoundException + * @throws ServerException + * @throws TimeoutException + * @throws Throwable + */ + public static function waitFor(JobPacket|int $job_id, int $timeout=0): mixed + { + if(self::$mode !== TamerMode::CLIENT) + { + throw new RuntimeException(sprintf('Attempting to waitFor() in \'%s\' mode, only clients can preform waitFor().', self::$mode)); + } + + if($job_id instanceof JobPacket) + { + $job_id = $job_id->getId(); + } + + $time_start = time(); + + while(true) + { + self::$supervisor->monitor(-1); + + switch(self::$job_manager->getJobStatus($job_id)) + { + case JobStatus::FINISHED: + $return = self::$job_manager->getJobResult($job_id); + self::$job_manager->dropJob($job_id); + return $return; + + case JobStatus::FAILED: + $throwable = self::$job_manager->getJobException($job_id); + self::$job_manager->dropJob($job_id); + throw $throwable; + } + + if($timeout < 0) + { + throw new TimeoutException('waitFor() timed out'); + } + + if($timeout > 0 && (time() - $time_start) >= $timeout) + { + throw new TimeoutException(sprintf('waitFor() timed out after %d seconds', $timeout)); + } + + usleep(10); + } + } + + + /** + * Clears the watchlist, this will remove all jobs from the watchlist. + * + * @return void + */ + public static function clear(): void + { + if(self::$mode !== TamerMode::CLIENT) + { + throw new RuntimeException(sprintf('Attempting to clear() in \'%s\' mode, only clients can preform clear().', self::$mode)); + } + + self::$watching_jobs = []; + } + + /** + * Invokes the call() function, returns the Job ID. + * + * @param string $name + * @param array $arguments + * @return mixed + */ + public static function __callStatic(string $name, array $arguments) + { + return self::call($name, $arguments); + } + + /** + * WORKER FUNCTIONS + */ + + /** + * @param string $function + * @param callable $callback + * @return void + */ + public static function addFunction(string $function, callable $callback): void + { + if(self::$mode !== TamerMode::WORKER) + { + throw new RuntimeException(sprintf('Attempting to addFunction() in \'%s\' mode, only workers can preform addFunction().', self::$mode)); + } + + self::$function_pointers[$function] = $callback; + } + + /** + * @param string $function + * @return void + */ + public static function removeFunction(string $function): void + { + if(self::$mode !== TamerMode::WORKER) + { + throw new RuntimeException(sprintf('Attempting to removeFunction() in \'%s\' mode, only workers can preform removeFunction().', self::$mode)); + } + + unset(self::$function_pointers[$function]); + } + + /** + * @return array + */ + public static function getFunctions(): array + { + if(self::$mode !== TamerMode::WORKER) + { + throw new RuntimeException(sprintf('Attempting to getFunctions() in \'%s\' mode, only workers can preform getFunctions().', self::$mode)); + } + + return array_keys(self::$function_pointers); + } + + /** + * @param int|array $channel + * @param int $timeout + * @return void + * @throws JobNotFoundException + * @throws ServerException + */ + public static function run(int|array $channel=0, int $timeout=0): void + { + if(self::$mode !== TamerMode::WORKER) + { + throw new RuntimeException(sprintf('Attempting to run() in \'%s\' mode, only workers can preform run().', self::$mode)); + } + + try + { + $job_packet = self::$job_manager->listenForJob(self::$worker_configuration->getWorkerId(), $channel, $timeout); + } + catch(TimeoutException $e) + { + unset($e); + return; + } + + Log::debug('net.nosial.tamerlib', sprintf('Worker %s received job %s', self::$worker_configuration->getWorkerId(), $job_packet->getId())); + + switch($job_packet->getJobType()) + { + case JobType::FUNCTION: + if(!isset(self::$function_pointers[$job_packet->getPayload()])) + { + Log::warning('net.nosial.tamerlib', sprintf('Job %s requested function \'%s\' which does not exist, rejecting job.', $job_packet->getId(), $job_packet->getPayload())); + self::$job_manager->rejectJob($job_packet); + } + + try + { + $result = call_user_func_array(self::$function_pointers[$job_packet->getPayload()], unserialize($job_packet->getParameters(), ['allowed_classes'=>true])); + self::$job_manager->returnJob($job_packet, $result); + } + catch(Exception $e) + { + var_dump($e); + self::$job_manager->returnException($job_packet, $e); + } + break; + + case JobType::CLOSURE: + try + { + $result = unserialize($job_packet->getPayload(), ['allowed_classes'=>true])( + unserialize($job_packet->getParameters(), ['allowed_classes'=>true]) + ); + self::$job_manager->returnJob($job_packet, $result); + } + catch(Exception $e) + { + self::$job_manager->returnException($job_packet, $e); + } + break; + } + } + } \ No newline at end of file diff --git a/tests/adaptive_sleep.php b/tests/adaptive_sleep.php new file mode 100644 index 0000000..7f1c19d --- /dev/null +++ b/tests/adaptive_sleep.php @@ -0,0 +1,29 @@ + 3) + { + // Simulate traffic every 3 seconds + $sleep = $adaptive_sleep->sleep(random_int(0, 100) < 90); + + if(time() - $time > 5) + { + // Stop the simulation after 5 seconds + $time = time(); + } + } + else + { + // No traffic + $sleep = $adaptive_sleep->sleep(false); + } + } \ No newline at end of file diff --git a/tests/tamer_standalone.php b/tests/tamer_standalone.php new file mode 100644 index 0000000..9af66ee --- /dev/null +++ b/tests/tamer_standalone.php @@ -0,0 +1,40 @@ +