/** * Configures the Redis server. * * @param string $cmd * @param string $host * @param int|null $port * @return void * @throws NoAvailablePortException */ public static function configureRedisServer(string $cmd='redis-server', string $host='127.0.0.1', ?int $port=null): void { if(self::$redis_server instanceof RedisServer) { throw new RuntimeException('Redis server already configured.'); } self::$redis_server = new RedisServer($cmd, $host, $port); } /** * Returns the Redis server * * @return RedisServer|null */ private static function getRedisServer(): ?RedisServer { if(is_null(self::$redis_server)) { self::$redis_server = new RedisServer(); } return self::$redis_server; } /** * Adds the job ID to the watch list. * * @param int $job_id * @return void */ private static function addToWatch(int $job_id): void { self::$watching_jobs[] = $job_id; } /** * Cleans up the watch list by removing jobs that no longer exist. * * @return void */ public static function cleanWatchList(): void { try { foreach(self::$watching_jobs as $job_id) { if(!self::getRedisClient()->exists($job_id)) { unset(self::$watching_jobs[$job_id]); } } } catch(Exception $e) { throw new RuntimeException('Failed to clean watch list.', 0, $e); } } /** * Returns the Redis client. * * @return Redis * @throws RedisException */ private static function getRedisClient(): Redis { if(is_null(self::$redis_server)) { self::$redis_server = new RedisServer(); } if(!self::$redis_server->isRunning()) { self::$redis_server->start(); } if(is_null(self::$redis_client)) { self::$redis_client = new Redis(); /** @noinspection NullPointerExceptionInspection */ var_dump(self::getRedisServer()->getHost(), self::getRedisServer()->getPort()); self::$redis_client->connect(self::getRedisServer()->getHost(), self::getRedisServer()->getPort()); } return self::$redis_client; } /** * Returns the worker supervisor. * * @return WorkerSupervisor */ private static function getSupervisor(): WorkerSupervisor { if(is_null(self::$supervisor)) { self::$supervisor = new WorkerSupervisor(); } return self::$supervisor; } /** * Spawns a closure worker. * * @param int $count * @return void */ public static function spawnClosure(int $count=8): void { self::getSupervisor()->spawnClosure(self::getRedisServer(), $count); } /** * Spawns a worker of a specific php file. * * @param array $cmd * @param int $count * @return void */ public static function spawn(array $cmd, int $count=8): void { self::getSupervisor()->spawnWorker($cmd, self::getRedisServer(), $count); } /** * Sends a closure to be executed by a worker, returns the job ID for the closure. * * @param callable $closure * @return string */ public static function do(callable $closure): string { try { $closure = serialize(new SerializableClosure($closure)); } catch(Exception $e) { throw new RuntimeException('Failed to serialize closure.', 2000, $e); } $job_packet = new JobPacket(); $job_packet->setJobType(JobType::CLOSURE); $job_packet->setEncodingType(EncodingType::SERIALIZED); $job_packet->setPayload($closure); // Push as hash using toArray() try { self::getRedisClient()->hMSet($job_packet->getId(), $job_packet->toArray()); self::getRedisClient()->expire($job_packet->getId(), 60); self::getRedisClient()->rPush(sprintf('ch%s', $job_packet->getChannel()), $job_packet->getId()); } catch(Exception $e) { throw new RuntimeException('Failed to push job to Redis.', 2000, $e); } self::addToWatch($job_packet->getId()); return $job_packet->getId(); } /** * Waits for all jobs to complete. * * @param callable|null $callback * @return void * @throws RedisException */ public static function waitAll(?callable $callback=null): void { while(count(self::$watching_jobs) > 0) { foreach(self::$watching_jobs as $job_id) { if(!self::getRedisClient()->exists($job_id)) { unset(self::$watching_jobs[$job_id]); continue; } if(!in_array(self::getRedisClient()->hGet($job_id, 'status'), JobStatus::PROCESSING_STATES)) { if(!is_null($callback)) { $callback(self::getRedisClient()->hGet($job_id, 'return_value')); } self::getRedisClient()->del($job_id); unset(self::$watching_jobs[$job_id]); } } } } /** * Waits for a job to complete, returns the result. * * @param string $job_id * @param int $timeout * @return mixed * @throws RedisException */ public function waitFor(string $job_id, int $timeout=0) { $timeout_count = time(); while(in_array(self::getRedisClient()->hGet($job_id, 'status'), JobStatus::PROCESSING_STATES, true)) { if($timeout > 0 && time() - $timeout_count > $timeout) { throw new RuntimeException(sprintf('waitFor %s timed out.', $job_id)); } usleep(100); } $return = self::getRedisClient()->hGet($job_id, 'result'); self::getRedisClient()->del($job_id); return $return; } /** * Gets a key from the shared memory. * * @param string $key * @param null $default * @return mixed */ public static function getKey(string $key, $default=null): mixed { if(isset(self::$shared_memory[$key])) { return self::$shared_memory[$key]; } return $default; } /** * Sets a key in the shared memory. * * @param string $key * @param mixed $value * @return void */ public static function setKey(string $key, mixed $value): void { self::$shared_memory[$key] = $value; }