From 0c23fdfac2c4315011b9c3de04528fd9bb0fe251 Mon Sep 17 00:00:00 2001 From: Netkas Date: Sat, 10 Jun 2023 01:46:34 -0400 Subject: [PATCH] Implemented Redis Server controller --- .idea/php.xml | 5 +- project.json | 6 - .../Abstracts/ExitCodes/WorkerExitCodes.php | 16 - src/TamerLib/Abstracts/JobStatus.php | 12 - src/TamerLib/Abstracts/Mode.php | 10 - src/TamerLib/Abstracts/ObjectType.php | 12 - src/TamerLib/Abstracts/ProtocolType.php | 10 - src/TamerLib/Abstracts/TaskPriority.php | 12 - src/TamerLib/Classes/Functions.php | 149 ------ src/TamerLib/Classes/RedisServer.php | 155 ++++++ src/TamerLib/Classes/Supervisor.php | 194 -------- src/TamerLib/Classes/Utilities.php | 38 ++ src/TamerLib/Classes/Validate.php | 81 --- .../Exceptions/ConnectionException.php | 19 - ...ption.php => NoAvailablePortException.php} | 2 +- .../Exceptions/RedisServerException.php | 13 + .../Interfaces/ClientProtocolInterface.php | 142 ------ .../Interfaces/WorkerProtocolInterface.php | 127 ----- src/TamerLib/Objects/Job.php | 127 ----- src/TamerLib/Objects/JobResults.php | 130 ----- src/TamerLib/Objects/Task.php | 192 ------- src/TamerLib/Objects/WorkerInstance.php | 186 ------- src/TamerLib/Objects/closure | 16 - src/TamerLib/Protocols/Gearman/Client.php | 468 ------------------ src/TamerLib/Protocols/Gearman/Worker.php | 371 -------------- src/TamerLib/Protocols/RabbitMq/Client.php | 467 ----------------- .../Protocols/RabbitMq/Connection.php | 216 -------- src/TamerLib/Protocols/RabbitMq/Worker.php | 399 --------------- src/TamerLib/Tamer.php | 436 ---------------- tests/no_tamer.php | 30 -- tests/redis_server.php | 18 + tests/tamer.php | 71 --- tests/tamer_client.php | 33 -- tests/tamer_worker.php | 18 - 34 files changed, 226 insertions(+), 3955 deletions(-) delete mode 100644 src/TamerLib/Abstracts/ExitCodes/WorkerExitCodes.php delete mode 100644 src/TamerLib/Abstracts/JobStatus.php delete mode 100644 src/TamerLib/Abstracts/Mode.php delete mode 100644 src/TamerLib/Abstracts/ObjectType.php delete mode 100644 src/TamerLib/Abstracts/ProtocolType.php delete mode 100644 src/TamerLib/Abstracts/TaskPriority.php delete mode 100644 src/TamerLib/Classes/Functions.php create mode 100644 src/TamerLib/Classes/RedisServer.php delete mode 100644 src/TamerLib/Classes/Supervisor.php create mode 100644 src/TamerLib/Classes/Utilities.php delete mode 100644 src/TamerLib/Classes/Validate.php delete mode 100644 src/TamerLib/Exceptions/ConnectionException.php rename src/TamerLib/Exceptions/{UnsupervisedWorkerException.php => NoAvailablePortException.php} (87%) create mode 100644 src/TamerLib/Exceptions/RedisServerException.php delete mode 100644 src/TamerLib/Interfaces/ClientProtocolInterface.php delete mode 100644 src/TamerLib/Interfaces/WorkerProtocolInterface.php delete mode 100644 src/TamerLib/Objects/Job.php delete mode 100644 src/TamerLib/Objects/JobResults.php delete mode 100644 src/TamerLib/Objects/Task.php delete mode 100644 src/TamerLib/Objects/WorkerInstance.php delete mode 100644 src/TamerLib/Objects/closure delete mode 100644 src/TamerLib/Protocols/Gearman/Client.php delete mode 100644 src/TamerLib/Protocols/Gearman/Worker.php delete mode 100644 src/TamerLib/Protocols/RabbitMq/Client.php delete mode 100644 src/TamerLib/Protocols/RabbitMq/Connection.php delete mode 100644 src/TamerLib/Protocols/RabbitMq/Worker.php delete mode 100644 tests/no_tamer.php create mode 100644 tests/redis_server.php delete mode 100644 tests/tamer.php delete mode 100644 tests/tamer_client.php delete mode 100644 tests/tamer_worker.php diff --git a/.idea/php.xml b/.idea/php.xml index 666a0dd..8ba8e7b 100644 --- a/.idea/php.xml +++ b/.idea/php.xml @@ -15,11 +15,8 @@ - - - - + diff --git a/project.json b/project.json index 37ae278..0c2bd23 100644 --- a/project.json +++ b/project.json @@ -47,12 +47,6 @@ "source_type": "remote", "source": "opis/closure=latest@composer" }, - { - "name": "com.php_amqplib.php_amqplib", - "version": "latest", - "source_type": "remote", - "source": "php-amqplib/php-amqplib=latest@composer" - }, { "name": "com.symfony.process", "version": "latest", diff --git a/src/TamerLib/Abstracts/ExitCodes/WorkerExitCodes.php b/src/TamerLib/Abstracts/ExitCodes/WorkerExitCodes.php deleted file mode 100644 index 4dd99f6..0000000 --- a/src/TamerLib/Abstracts/ExitCodes/WorkerExitCodes.php +++ /dev/null @@ -1,16 +0,0 @@ - new \TamerLib\Protocols\Gearman\Client($username, $password), - ProtocolType::RabbitMQ => throw new InvalidArgumentException('RabbitMQ is not fully implemented yet'), - default => throw new InvalidArgumentException('Invalid protocol type'), - }; - } - - /** - * @param string $protocol - * @param string|null $username - * @param string|null $password - * @return WorkerProtocolInterface - */ - public static function createWorker(string $protocol, ?string $username=null, ?string $password=null): WorkerProtocolInterface - { - /** @noinspection PhpFullyQualifiedNameUsageInspection */ - return match (strtolower($protocol)) - { - ProtocolType::Gearman => new \TamerLib\Protocols\Gearman\Worker($username, $password), - ProtocolType::RabbitMQ => throw new InvalidArgumentException('RabbitMQ is not fully implemented yet'), - default => throw new InvalidArgumentException('Invalid protocol type'), - }; - } - - /** - * Returns the worker variables from the environment variables - * - * @return array - */ - public static function getWorkerVariables(): array - { - if(self::$worker_variables == null) - { - self::$worker_variables = [ - 'TAMER_ENABLED' => getenv('TAMER_ENABLED') === 'true', - 'TAMER_PROTOCOL' => getenv('TAMER_PROTOCOL'), - 'TAMER_SERVERS' => getenv('TAMER_SERVERS'), - 'TAMER_USERNAME' => getenv('TAMER_USERNAME'), - 'TAMER_PASSWORD' => getenv('TAMER_PASSWORD'), - 'TAMER_INSTANCE_ID' => getenv('TAMER_INSTANCE_ID'), - ]; - - if(self::$worker_variables['TAMER_SERVERS'] !== false) - self::$worker_variables['TAMER_SERVERS'] = explode(',', self::$worker_variables['TAMER_SERVERS']); - } - - return self::$worker_variables; - } - - /** - * Returns the path to the php binary - * - * @return string - * @throws Exception - */ - public static function findPhpBin(): string - { - if(self::$php_bin !== null) - return self::$php_bin; - - $php_finder = new PhpExecutableFinder(); - $php_bin = $php_finder->find(); - if($php_bin === false) - throw new Exception('Unable to find the php binary'); - - self::$php_bin = $php_bin; - return $php_bin; - } - - /** - * Calculates the priority for a task based on the priority level - * - * @param int $priority - * @return int - */ - public static function calculatePriority(int $priority): int - { - if($priority < TaskPriority::Low) - return 0; - - if($priority > TaskPriority::High) - return 255; - - return (int) round(($priority / TaskPriority::High) * 255); - } - } \ No newline at end of file diff --git a/src/TamerLib/Classes/RedisServer.php b/src/TamerLib/Classes/RedisServer.php new file mode 100644 index 0000000..cc6f2d4 --- /dev/null +++ b/src/TamerLib/Classes/RedisServer.php @@ -0,0 +1,155 @@ +cmd = $cmd; + $this->host = $host; + $this->port = $port; + } + + /** + * Returns the port that the Redis server is listening on. + * + * @return int|null + */ + public function getPort(): ?int + { + return $this->port; + } + + /** + * Determines if the Redis server is running. + * + * @return bool + */ + public function isRunning(): bool + { + if(is_null($this->server_process)) + { + return false; + } + + return $this->server_process->isRunning(); + } + + /** + * Starts the Redis server. + * + * @param int $timeout + * @return bool + * @throws RedisServerException + * @throws RedisException + */ + public function start(int $timeout=60): bool + { + if($this->isRunning()) + { + return true; + } + + Log::verbose('net.nosial.tamerlib', 'Starting Redis server on port ' . $this->port . '.'); + $this->server_process = new Process([$this->cmd, '--port', $this->port]); + $this->server_process->start(); + + // Use a redis client and ping the server until it responds. + $redis_client = new Redis(); + $timeout_counter = 0; + + while(!$redis_client->isConnected()) + { + if($timeout_counter >= $timeout) + { + throw new RedisServerException('Redis server failed to start within ' . $timeout . ' seconds.'); + } + + try + { + $redis_client->connect($this->host, $this->port); + } + catch (RedisException $e) + { + // Do nothing. + } + finally + { + sleep(1); + $timeout_counter++; + } + } + + Log::verbose('net.nosial.tamerlib', 'Redis server started.'); + return true; + } + + /** + * Stops the Redis server. + * + * @return bool + */ + public function stop(): bool + { + if(!$this->isRunning()) + { + return true; + } + + $this->server_process->stop(); + Log::verbose('net.nosial.tamerlib', 'Redis server stopped.'); + return true; + } + + /** + * Terminates the Redis server. + */ + public function __destruct() + { + $this->stop(); + } + } \ No newline at end of file diff --git a/src/TamerLib/Classes/Supervisor.php b/src/TamerLib/Classes/Supervisor.php deleted file mode 100644 index 5c5dabc..0000000 --- a/src/TamerLib/Classes/Supervisor.php +++ /dev/null @@ -1,194 +0,0 @@ -workers = []; - $this->protocol = $protocol; - $this->servers = $servers; - $this->username = $username; - $this->password = $password; - } - - /** - * Adds a worker to the supervisor instance - * - * @param string $target - * @param int $instances - * @return void - * @throws Exception - */ - public function addWorker(string $target, int $instances): void - { - for ($i = 0; $i < $instances; $i++) - { - $this->workers[] = new WorkerInstance($target, $this->protocol, $this->servers, $this->username, $this->password); - } - } - - /** - * Starts all the workers - * - * @return void - * @throws Exception - */ - public function start(): void - { - /** @var WorkerInstance $worker */ - foreach ($this->workers as $worker) - { - $worker->start(); - } - - // Ensure that all the workers are running - foreach($this->workers as $worker) - { - if (!$worker->isRunning()) - { - throw new Exception("Worker {$worker->getId()} is not running"); - } - - while(true) - { - switch($worker->getProcess()->getStatus()) - { - case Process::STATUS_STARTED: - Log::debug('net.nosial.tamerlib', "worker {$worker->getId()} is running"); - break 2; - - case Process::STATUS_TERMINATED: - throw new Exception("Worker {$worker->getId()} has terminated"); - - default: - echo "Worker {$worker->getId()} is {$worker->getProcess()->getStatus()}" . PHP_EOL; - } - } - } - } - - /** - * Stops all the workers - * - * @return void - * @throws Exception - */ - public function stop(): void - { - /** @var WorkerInstance $worker */ - foreach ($this->workers as $worker) - { - $worker->stop(); - } - } - - /** - * Restarts all the workers - * - * @return void - * @throws Exception - */ - public function restart(): void - { - /** @var WorkerInstance $worker */ - foreach ($this->workers as $worker) - { - $worker->stop(); - $worker->start(); - } - } - - /** - * Monitors all the workers and restarts them if they are not running - * - * @param bool $blocking - * @param bool $auto_restart - * @return void - * @throws Exception - */ - public function monitor(bool $blocking=false, bool $auto_restart=true): void - { - while(true) - { - /** @var WorkerInstance $worker */ - foreach ($this->workers as $worker) - { - if (!$worker->isRunning()) - { - if ($auto_restart) - { - Log::warning('net.nosial.tamerlib', "worker {$worker->getId()} is not running, restarting"); - $worker->start(); - } - else - { - throw new Exception("Worker {$worker->getId()} is not running"); - } - } - } - - if (!$blocking) - { - break; - } - - sleep(1); - } - } - - /** - * @throws Exception - */ - public function __destruct() - { - $this->stop(); - } - - } \ No newline at end of file diff --git a/src/TamerLib/Classes/Utilities.php b/src/TamerLib/Classes/Utilities.php new file mode 100644 index 0000000..a2128a7 --- /dev/null +++ b/src/TamerLib/Classes/Utilities.php @@ -0,0 +1,38 @@ + true, - default => false, - }; - } - - /** - * @param string $input - * @return bool - */ - public static function mode(string $input): bool - { - return match (strtolower($input)) - { - Mode::Client, Mode::Worker => true, - default => false, - }; - } - - /** - * Returns true if the input is a valid task priority. - * - * @param int $input - * @return bool - */ - public static function taskPriority(int $input): bool - { - return match ($input) - { - TaskPriority::Low, TaskPriority::Normal, TaskPriority::High => true, - default => false, - }; - } - - - /** - * Determines the object type - * - * @param $input - * @return string - */ - public static function getObjectType($input): string - { - if(!is_array($input)) - { - return ObjectType::Unknown; - } - - if(!array_key_exists('type', $input)) - { - return ObjectType::Unknown; - } - - return match ($input['type']) - { - ObjectType::Job => ObjectType::Job, - ObjectType::JobResults => ObjectType::JobResults, - default => ObjectType::Unknown, - }; - } - } \ No newline at end of file diff --git a/src/TamerLib/Exceptions/ConnectionException.php b/src/TamerLib/Exceptions/ConnectionException.php deleted file mode 100644 index e8ab5dd..0000000 --- a/src/TamerLib/Exceptions/ConnectionException.php +++ /dev/null @@ -1,19 +0,0 @@ -id = $task->getId(); - $this->name = $task->getFunctionName(); - $this->data = $task->getData(); - $this->closure = $task->isClosure(); - } - - /** - * Returns the ID of the Job - * - * @return string - */ - public function getId(): string - { - return $this->id; - } - - /** - * Returns the function name of the Job - * - * @return string - */ - public function getName(): string - { - return $this->name; - } - - /** - * Returns the data of the Job - * - * @return string|Closure|null - */ - public function getData(): Closure|string|null - { - return $this->data; - } - - /** - * @return bool - */ - public function isClosure(): bool - { - return $this->closure; - } - - /** - * Returns an array representation of the Job - * - * @return array - */ - public function toArray(): array - { - return [ - 'type' => 'tamer_job', - 'id' => $this->id, - 'name' => $this->name, - 'data' => ($this->closure ? serialize(new SerializableClosure($this->data)) : $this->data), - 'closure' => $this->closure - ]; - } - - /** - * Constructs a Job from an array - * - * @param array $data - * @return Job - */ - public static function fromArray(array $data): Job - { - $job_data = $data['data']; - - if($data['closure'] === true) - { - /** @var SerializableClosure $job_data */ - $job_data = unserialize($data['data']); - $job_data = $job_data->getClosure(); - } - - $job = new Job(new Task($data['name'], $job_data)); - $job->id = $data['id']; - $job->closure = $data['closure']; - - return $job; - } - - } \ No newline at end of file diff --git a/src/TamerLib/Objects/JobResults.php b/src/TamerLib/Objects/JobResults.php deleted file mode 100644 index 83def50..0000000 --- a/src/TamerLib/Objects/JobResults.php +++ /dev/null @@ -1,130 +0,0 @@ -id = $job->getId(); - $this->data = $results; - $this->status = $status; - } - } - - /** - * Returns the ID of the Job - * - * @return string - */ - public function getId(): string - { - return $this->id; - } - - - /** - * Returns the data of the Job - * - * @return string - */ - public function getData(): string - { - return $this->data; - } - - /** - * @return int - * @noinspection PhpUnused - */ - public function getStatus(): int - { - return $this->status; - } - - /** - * Returns an array representation of the Job - * - * @return array - */ - public function toArray(): array - { - return [ - 'type' => 'tamer_job_results', - 'id' => $this->id, - 'data' => $this->data, - 'status' => $this->status - ]; - } - - /** - * Constructs a Job from an array - * - * @param array $data - * @return JobResults - */ - public static function fromArray(array $data): JobResults - { - $job = new JobResults(); - - $job->setId($data['id']); - $job->setData($data['data']); - $job->setStatus($data['status']); - - return $job; - } - - /** - * @param string $id - */ - protected function setId(string $id): void - { - $this->id = $id; - } - - /** - * @param string $data - */ - protected function setData(string $data): void - { - $this->data = $data; - } - - /** - * @param int|null $status - */ - protected function setStatus(?int $status): void - { - $this->status = $status; - } - - - } \ No newline at end of file diff --git a/src/TamerLib/Objects/Task.php b/src/TamerLib/Objects/Task.php deleted file mode 100644 index da55d34..0000000 --- a/src/TamerLib/Objects/Task.php +++ /dev/null @@ -1,192 +0,0 @@ -function_name = $function_name; - $this->data = $data; - $this->id = uniqid(); - $this->priority = TaskPriority::Normal; - $this->callback = $callback; - $this->closure = false; - } - - /** - * Static Constructor - * - * @param string $function_name - * @param string|Closure|null $data - * @param callable|null $callback - * @return static - */ - public static function create(string $function_name, string|Closure|null $data, callable $callback=null): self - { - return new self($function_name, $data, $callback); - } - - /** - * Returns the function name for the task - * - * @return string - */ - public function getFunctionName(): string - { - return $this->function_name; - } - - /** - * Sets the function name for the task - * - * @param string $function_name - * @return Task - */ - public function setFunctionName(string $function_name): self - { - $this->function_name = $function_name; - return $this; - } - - /** - * Returns the arguments for the task - * - * @return string|Closure|null - */ - public function getData(): string|null|Closure - { - return $this->data; - } - - /** - * Sets the arguments for the task - * - * @param string $data - * @return Task - */ - public function setData(string $data): self - { - $this->data = $data; - return $this; - } - - /** - * Returns the Unique ID of the task - * - * @return string - */ - public function getId(): string - { - return $this->id; - } - - /** - * @return int - */ - public function getPriority(): int - { - return $this->priority; - } - - /** - * @param int $priority - * @return Task - */ - public function setPriority(int $priority): self - { - if(!Validate::taskPriority($priority)) - { - throw new InvalidArgumentException("Invalid priority value"); - } - - $this->priority = $priority; - return $this; - } - - /** - * @param Closure|null $callback - * @return Task - */ - public function setCallback(?Closure $callback): self - { - $this->callback = $callback; - return $this; - } - - /** - * Executes the callback function - * - * @param string|JobResults|null $result - * @return void - */ - public function runCallback(string|JobResults|null $result): void - { - if($this->callback !== null) - { - call_user_func($this->callback, $result); - } - } - - /** - * @return bool - */ - public function isClosure(): bool - { - return $this->closure; - } - - /** - * @param bool $closure - * @return Task - */ - public function setClosure(bool $closure): self - { - $this->closure = $closure; - return $this; - } - } \ No newline at end of file diff --git a/src/TamerLib/Objects/WorkerInstance.php b/src/TamerLib/Objects/WorkerInstance.php deleted file mode 100644 index 52cb424..0000000 --- a/src/TamerLib/Objects/WorkerInstance.php +++ /dev/null @@ -1,186 +0,0 @@ -id = uniqid(); - $this->target = $target; - $this->protocol = $protocol; - $this->servers = $servers; - $this->username = $username; - $this->password = $password; - $this->process = null; - - if($target !== 'closure' && file_exists($target) === false) - { - throw new Exception('The target file does not exist'); - } - } - - /** - * Returns the worker instance id - * - * @return string - */ - public function getId(): string - { - return $this->id; - } - - /** - * Executes the worker instance in a separate process - * - * @return void - * @throws Exception - */ - public function start(): void - { - $target = $this->target; - if($target == 'closure') - { - $target = __DIR__ . DIRECTORY_SEPARATOR . 'closure'; - } - - $argv = $_SERVER['argv']; - array_shift($argv); - - $this->process = new Process(array_merge([Functions::findPhpBin(), $target], $argv)); - $this->process->setEnv([ - 'TAMER_ENABLED' => 'true', - 'TAMER_PROTOCOL' => $this->protocol, - 'TAMER_SERVERS' => implode(',', $this->servers), - 'TAMER_USERNAME' => $this->username, - 'TAMER_PASSWORD' => $this->password, - 'TAMER_INSTANCE_ID' => $this->id - ]); - - - Log::debug('net.nosial.tamerlib', sprintf('starting worker %s', $this->id)); - - // Callback for process output - $this->process->start(function ($type, $buffer) - { - // Add newline if it's missing - if(substr($buffer, -1) !== PHP_EOL) - { - $buffer .= PHP_EOL; - } - - print($buffer); - }); - } - - /** - * Stops the worker instance - * - * @return void - */ - public function stop(): void - { - if($this->process !== null) - { - Log::debug('net.nosial.tamerlib', sprintf('Stopping worker %s', $this->id)); - $this->process->stop(); - } - } - - /** - * Returns whether the worker instance is running - * - * @return bool - */ - public function isRunning(): bool - { - if($this->process !== null) - { - return $this->process->isRunning(); - } - - return false; - } - - /** - * @return Process|null - */ - public function getProcess(): ?Process - { - return $this->process; - } - - /** - * Destructor - */ - public function __destruct() - { - $this->stop(); - } - } \ No newline at end of file diff --git a/src/TamerLib/Objects/closure b/src/TamerLib/Objects/closure deleted file mode 100644 index 309bd82..0000000 --- a/src/TamerLib/Objects/closure +++ /dev/null @@ -1,16 +0,0 @@ -getMessage(), $e); - exit(1); - } diff --git a/src/TamerLib/Protocols/Gearman/Client.php b/src/TamerLib/Protocols/Gearman/Client.php deleted file mode 100644 index 7ae3a0d..0000000 --- a/src/TamerLib/Protocols/Gearman/Client.php +++ /dev/null @@ -1,468 +0,0 @@ -client = null; - $this->tasks = []; - $this->automatic_reconnect = false; - $this->next_reconnect = time() + 1800; - $this->defined_servers = []; - $this->options = []; - } - - - /** - * Adds a server to the list of servers to use - * - * @link http://php.net/manual/en/gearmanclient.addserver.php - * @param string $host (127.0.0.1) - * @param int $port (default: 4730) - * @return void - */ - public function addServer(string $host, int $port): void - { - if(!isset($this->defined_servers[$host])) - { - $this->defined_servers[$host] = []; - } - - if(in_array($port, $this->defined_servers[$host])) - { - return; - } - - $this->defined_servers[$host][] = $port; - } - - /** - * Adds a list of servers to the list of servers to use - * - * @link http://php.net/manual/en/gearmanclient.addservers.php - * @param array $servers (host:port, host:port, ...) - * @return void - */ - public function addServers(array $servers): void - { - foreach($servers as $server) - { - $server = explode(':', $server); - $this->addServer($server[0], (int)$server[1]); - } - } - - /** - * Connects to the server(s) - * - * @return void - * @throws ConnectionException - */ - public function connect(): void - { - if($this->isConnected()) - return; - - $this->client = new GearmanClient(); - - // Parse $options combination via bitwise OR operator - $options = array_reduce($this->options, function($carry, $item) - { - return $carry | $item; - }); - - $this->client->addOptions($options); - - foreach($this->defined_servers as $host => $ports) - { - foreach($ports as $port) - { - try - { - $this->client->addServer($host, $port); - Log::debug('net.nosial.tamerlib', 'connected to gearman server: ' . $host . ':' . $port); - } - catch(Exception $e) - { - throw new ConnectionException('Failed to connect to Gearman server: ' . $host . ':' . $port, 0, $e); - } - } - } - - $this->client->setCompleteCallback([$this, 'callbackHandler']); - $this->client->setFailCallback([$this, 'callbackHandler']); - $this->client->setDataCallback([$this, 'callbackHandler']); - $this->client->setStatusCallback([$this, 'callbackHandler']); - } - - /** - * Disconnects from the server(s) - * - * @return void - */ - public function disconnect(): void - { - if(!$this->isConnected()) - return; - - Log::debug('net.nosial.tamerlib', 'disconnecting from gearman server(s)'); - $this->client->clearCallbacks(); - unset($this->client); - $this->client = null; - } - - /** - * Reconnects to the server(s) - * - * @return void - * @throws ConnectionException - */ - public function reconnect(): void - { - Log::debug('net.nosial.tamerlib', 'reconnecting to gearman server(s)'); - - $this->disconnect(); - $this->connect(); - } - - /** - * Returns the current status of the client - * - * @inheritDoc - * @return bool - */ - public function isConnected(): bool - { - if($this->client === null) - { - return false; - } - - return true; - } - - /** - * The automatic reconnect process - * - * @return void - */ - private function preformAutoreconf(): void - { - if($this->automatic_reconnect && $this->next_reconnect < time()) - { - try - { - $this->reconnect(); - } - catch (Exception $e) - { - Log::error('net.nosial.tamerlib', 'Failed to reconnect to Gearman server: ' . $e->getMessage()); - } - finally - { - $this->next_reconnect = time() + 1800; - } - } - } - - /** - * Adds client options - * - * @link http://php.net/manual/en/gearmanclient.addoptions.php - * @param int[] $options (GEARMAN_CLIENT_NON_BLOCKING, GEARMAN_CLIENT_UNBUFFERED_RESULT, GEARMAN_CLIENT_FREE_TASKS) - * @return void - */ - public function setOptions(array $options): void - { - $this->options = $options; - } - - /** - * Returns the current client options - * - * @return array - */ - public function getOptions(): array - { - return $this->options; - } - - /** - * Clears the current client options - * - * @return void - */ - public function clearOptions(): void - { - $this->options = []; - } - - /** - * Executes a closure in the background - * - * @param Closure $closure - * @return void - */ - public function doClosure(Closure $closure): void - { - $closure_task = new Task('tamer_closure', $closure); - $closure_task->setClosure(true); - $this->do($closure_task); - } - - /** - * Processes a task in the background - * - * @param Task $task - * @return void - */ - public function do(Task $task): void - { - $this->preformAutoreconf(); - - $this->tasks[] = $task; - $job = new Job($task); - $job_data = msgpack_pack($job->toArray()); - - Log::debug('net.nosial.tamerlib', 'sending closure to gearman server: ' . strlen($job_data) . ' bytes'); - switch($task->getPriority()) - { - case TaskPriority::High: - $this->client->doHighBackground($task->getFunctionName(), $job_data); - break; - - case TaskPriority::Low: - $this->client->doLowBackground($task->getFunctionName(), $job_data); - break; - - default: - case TaskPriority::Normal: - $this->client->doBackground($task->getFunctionName(), $job_data); - break; - } - } - - /** - * Adds a task to the list of tasks to run - * - * @param Task $task - * @return void - */ - public function queue(Task $task): void - { - $this->preformAutoreconf(); - - $this->tasks[] = $task; - $job = new Job($task); - $job_data = msgpack_pack($job->toArray()); - - Log::debug('net.nosial.tamerlib', 'sending closure to gearman server: ' . strlen($job_data) . ' bytes'); - switch($task->getPriority()) - { - case TaskPriority::High: - $this->client->addTaskHigh($task->getFunctionName(), $job_data); - break; - - case TaskPriority::Low: - $this->client->addTaskLow($task->getFunctionName(), $job_data); - break; - - default: - case TaskPriority::Normal: - $this->client->addTask($task->getFunctionName(), $job_data); - break; - } - } - - /** - * Adds a closure task to the list of tasks to run - * - * @param Closure $closure - * @param Closure|null $callback - * @return void - */ - public function queueClosure(Closure $closure, ?Closure $callback=null): void - { - $closure_task = new Task('tamer_closure', $closure, $callback); - $closure_task->setClosure(true); - $this->queue($closure_task); - } - - /** - * @return bool - */ - public function run(): bool - { - if(!$this->isConnected()) - return false; - - $this->preformAutoreconf(); - - if(!$this->client->runTasks()) - return false; - - return true; - } - - /** - * Processes a task callback in the foreground - * - * @param GearmanTask $task - * @return void - */ - public function callbackHandler(GearmanTask $task): void - { - $job_result = JobResults::fromArray(msgpack_unpack($task->data())); - $internal_task = $this->getTaskById($job_result->getId()); - - Log::debug('net.nosial.tamerlib', 'callback for task ' . $internal_task->getId() . ' with status ' . $job_result->getStatus() . ' and data size ' . strlen($task->data()) . ' bytes'); - - try - { - if($internal_task->isClosure()) - { - // If the task is a closure, we need to run the callback with the closure's return value - // instead of the job result object - $internal_task->runCallback($job_result->getData()); - return; - } - - $internal_task->runCallback($job_result); - } - catch(Exception $e) - { - Log::error('net.nosial.tamerlib', 'Failed to run callback for task ' . $internal_task->getId() . ': ' . $e->getMessage(), $e); - } - finally - { - $this->removeTask($internal_task); - } - } - - /** - * @param string $id - * @return Task|null - */ - private function getTaskById(string $id): ?Task - { - foreach($this->tasks as $task) - { - if($task->getId() === $id) - { - return $task; - } - } - - return null; - } - - /** - * Removes a task from the list of tasks - * - * @param Task $task - * @return void - */ - private function removeTask(Task $task): void - { - $this->tasks = array_filter($this->tasks, function($item) use ($task) - { - return $item->getId() !== $task->getId(); - }); - - } - - /** - * @return bool - */ - public function automaticReconnectionEnabled(): bool - { - return $this->automatic_reconnect; - } - - /** - * @param bool $enable - */ - public function enableAutomaticReconnection(bool $enable): void - { - $this->automatic_reconnect = $enable; - } - - /** - * Executes all remaining tasks and closes the connection - */ - public function __destruct() - { - try - { - $this->disconnect(); - } - catch(Exception $e) - { - unset($e); - } - } - } \ No newline at end of file diff --git a/src/TamerLib/Protocols/Gearman/Worker.php b/src/TamerLib/Protocols/Gearman/Worker.php deleted file mode 100644 index 9c32810..0000000 --- a/src/TamerLib/Protocols/Gearman/Worker.php +++ /dev/null @@ -1,371 +0,0 @@ -worker = null; - $this->defined_servers = []; - $this->automatic_reconnect = false; - $this->next_reconnect = time() + 1800; - $this->options = []; - } - - /** - * Adds a server to the list of servers to use - * - * @link http://php.net/manual/en/gearmanworker.addserver.php - * @param string $host ( - * @param int $port (default: 4730) - * @return void - */ - public function addServer(string $host, int $port): void - { - if(!isset($this->defined_servers[$host])) - { - $this->defined_servers[$host] = []; - } - - if(in_array($port, $this->defined_servers[$host])) - { - return; - } - - $this->defined_servers[$host][] = $port; - } - - /** - * Adds a list of servers to the list of servers to use - * - * @link http://php.net/manual/en/gearmanworker.addservers.php - * @param string[] $servers (host:port, host:port, ...) - * @return void - */ - public function addServers(array $servers): void - { - foreach($servers as $server) - { - $server = explode(':', $server); - $this->addServer($server[0], (int)$server[1]); - } - } - - /** - * Connects to the server - * - * @return void - * @throws ConnectionException - */ - public function connect(): void - { - if($this->isConnected()) - return; - - $this->worker = new GearmanWorker(); - $this->worker->addOptions(GEARMAN_WORKER_GRAB_UNIQ); - - foreach($this->defined_servers as $host => $ports) - { - foreach($ports as $port) - { - try - { - $this->worker->addServer($host, $port); - Log::debug('net.nosial.tamerlib', 'connected to gearman server: ' . $host . ':' . $port); - } - catch(Exception $e) - { - throw new ConnectionException('Failed to connect to Gearman server: ' . $host . ':' . $port, 0, $e); - } - } - } - - $this->worker->addFunction('tamer_closure', function(GearmanJob $job) - { - $received_job = Job::fromArray(msgpack_unpack($job->workload())); - Log::debug('net.nosial.tamerlib', 'received closure: ' . $received_job->getId()); - - try - { - /** @var SerializableClosure $closure */ - $closure = $received_job->getData(); - $result = $closure($received_job); - } - catch(Exception $e) - { - $job->sendFail(); - unset($e); - return; - } - - $job_results = new JobResults($received_job, JobStatus::Success, $result); - $job->sendComplete(msgpack_pack($job_results->toArray())); - Log::debug('net.nosial.tamerlib', 'completed closure: ' . $received_job->getId()); - }); - } - - /** - * Disconnects from the server - * - * @return void - */ - public function disconnect(): void - { - if(!$this->isConnected()) - return; - - $this->worker->unregisterAll(); - unset($this->worker); - $this->worker = null; - } - - /** - * Reconnects to the server if the connection has been lost - * - * @return void - * @throws ConnectionException - */ - public function reconnect(): void - { - $this->disconnect(); - $this->connect(); - } - - /** - * Returns true if the worker is connected to the server - * - * @return bool - */ - public function isConnected(): bool - { - return $this->worker !== null; - } - - /** - * The automatic reconnect process - * - * @return void - */ - private function preformAutoreconf(): void - { - if($this->automatic_reconnect && $this->next_reconnect < time()) - { - try - { - $this->reconnect(); - } - catch (Exception $e) - { - Log::error('net.nosial.tamerlib', 'Failed to reconnect to Gearman server: ' . $e->getMessage()); - } - finally - { - $this->next_reconnect = time() + 1800; - } - } - } - - /** - * Sets the options to use when connecting to the server - * - * @param array $options - * @return bool - * @inheritDoc - */ - public function setOptions(array $options): void - { - $this->options = $options; - } - - /** - * Returns the options to use when connecting to the server - * - * @return array - */ - public function getOptions(): array - { - return $this->options; - } - - /** - * Clears the options to use when connecting to the server - * - * @return void - */ - public function clearOptions(): void - { - $this->options = []; - } - - /** - * @return bool - */ - public function automaticReconnectionEnabled(): bool - { - return $this->automatic_reconnect; - } - - /** - * @param bool $enable - * @return void - */ - public function enableAutomaticReconnection(bool $enable): void - { - $this->automatic_reconnect = $enable; - } - - /** - * Adds a function to the list of functions to call - * - * @link http://php.net/manual/en/gearmanworker.addfunction.php - * @param string $name The name of the function to register with the job server - * @param callable $callable The callback function to call when the job is received - * @return void - */ - public function addFunction(string $name, callable $callable): void - { - $this->worker->addFunction($name, function(GearmanJob $job) use ($callable) - { - $received_job = Job::fromArray(msgpack_unpack($job->workload())); - Log::debug('net.nosial.tamerlib', 'received job: ' . $received_job->getId()); - - try - { - $result = $callable($received_job); - } - catch(Exception $e) - { - $job->sendFail(); - unset($e); - return; - } - - $job_results = new JobResults($received_job, JobStatus::Success, $result); - $job->sendComplete(msgpack_pack($job_results->toArray())); - Log::debug('net.nosial.tamerlib', 'completed job: ' . $received_job->getId()); - }); - } - - /** - * Removes a function from the list of functions to call - * - * @param string $function_name The name of the function to unregister - * @return void - */ - public function removeFunction(string $function_name): void - { - $this->worker->unregister($function_name); - } - - /** - * Waits for a job and calls the appropriate callback function - * - * @link http://php.net/manual/en/gearmanworker.work.php - * @param bool $blocking (default: true) Whether to block until a job is received - * @param int $timeout (default: 500) The timeout in milliseconds (if $blocking is false) - * @param bool $throw_errors (default: false) Whether to throw exceptions on errors - * @return void Returns nothing - * @throws ConnectionException - */ - public function work(bool $blocking=true, int $timeout=500, bool $throw_errors=false): void - { - $this->worker->setTimeout($timeout); - - while(true) - { - @$this->preformAutoreconf(); - @$this->worker->work(); - - if($this->worker->returnCode() == GEARMAN_COULD_NOT_CONNECT) - { - throw new ConnectionException('Could not connect to Gearman server'); - } - - if($this->worker->returnCode() == GEARMAN_TIMEOUT && !$blocking) - { - break; - } - - if($this->worker->returnCode() != GEARMAN_SUCCESS && $throw_errors) - { - Log::error('net.nosial.tamerlib', 'Gearman worker error: ' . $this->worker->error()); - } - - if($blocking) - { - usleep($timeout); - } - } - } - - /** - * Executes all remaining tasks and closes the connection - */ - public function __destruct() - { - try - { - $this->disconnect(); - } - catch(Exception $e) - { - unset($e); - } - } - } \ No newline at end of file diff --git a/src/TamerLib/Protocols/RabbitMq/Client.php b/src/TamerLib/Protocols/RabbitMq/Client.php deleted file mode 100644 index 9fc3580..0000000 --- a/src/TamerLib/Protocols/RabbitMq/Client.php +++ /dev/null @@ -1,467 +0,0 @@ -tasks = []; - $this->automatic_reconnect = false; - $this->defined_servers = []; - $this->options = []; - $this->username = $username; - $this->password = $password; - $this->connections = []; - } - - /** - * Adds a server to the list of servers to use - * - * @param string $host - * @param int $port - * @return void - */ - public function addServer(string $host, int $port): void - { - if(!isset($this->defined_servers[$host])) - { - $this->defined_servers[$host] = []; - } - - if(in_array($port, $this->defined_servers[$host])) - { - return; - } - - $this->defined_servers[$host][] = $port; - } - - /** - * Adds a list of servers to the list of servers to use - * - * @param array $servers (host:port, host:port, ...) - * @return void - */ - public function addServers(array $servers): void - { - foreach($servers as $server) - { - $server = explode(':', $server); - $this->addServer($server[0], (int)$server[1]); - } - } - - /** - * Connects to the server(s) defined - * - * @return void - * @throws ConnectionException - */ - public function connect(): void - { - if($this->isConnected()) - return; - - if(count($this->defined_servers) === 0) - return; - - foreach($this->defined_servers as $host => $ports) - { - foreach($ports as $port) - { - $connection = new Connection($host, $port, $this->username, $this->password); - $connection->connect(); - - $this->connections[] = $connection; - } - } - } - - /** - * Disconnects from the server - * - * @return void - */ - public function disconnect(): void - { - if(!$this->isConnected()) - return; - - foreach($this->connections as $connection) - { - $connection->disconnect(); - } - - $this->connections = []; - } - - /** - * Reconnects to the server - * - * @return void - * @throws ConnectionException - */ - public function reconnect(): void - { - $this->disconnect(); - $this->connect(); - } - - /** - * Returns True if one or more connections are connected, False otherwise - * (Note, some connections may be disconnected, and this will still return True) - * - * @return bool - */ - public function isConnected(): bool - { - if(count($this->connections) === 0) - return false; - - foreach($this->connections as $connection) - { - if($connection->isConnected()) - return true; - } - - return false; - } - - /** - * Sets the options array - * - * @param array $options - * @return void - */ - public function setOptions(array $options): void - { - $this->options = $options; - } - - /** - * Returns the options array - * - * @return array - */ - public function getOptions(): array - { - return $this->options; - } - - /** - * Clears the options array - * - * @return void - */ - public function clearOptions(): void - { - $this->options = []; - } - - /** - * Returns True if the client is automatically reconnecting to the server - * - * @return bool - */ - public function automaticReconnectionEnabled(): bool - { - return $this->automatic_reconnect; - } - - /** - * Enables or disables automatic reconnecting to the server - * - * @param bool $enable - * @return void - */ - public function enableAutomaticReconnection(bool $enable): void - { - $this->automatic_reconnect = $enable; - } - - /** - * Runs a task in the background (Fire and Forget) - * - * @param Task $task - * @return void - */ - public function do(Task $task): void - { - if(!$this->isConnected()) - return; - - $job = new Job($task); - $message = new AMQPMessage(msgpack_pack($job->toArray()), [ - 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, - 'correlation_id' => $task->getId(), - 'priority' => Functions::calculatePriority($task->getPriority()), - ]); - - // Select random connection - $connection = $this->connections[array_rand($this->connections)]; - if($this->automatic_reconnect) - $connection->preformAutoreconf(); - $connection->getChannel()->basic_publish($message, '', 'tamer_queue'); - } - - /** - * Executes a closure in the background - * - * @param Closure $closure - * @return void - */ - public function doClosure(Closure $closure): void - { - $closure_task = new Task('tamer_closure', $closure); - $closure_task->setClosure(true); - $this->do($closure_task); - } - - /** - * Queues a task to be executed - * - * @param Task $task - * @return void - */ - public function queue(Task $task): void - { - $this->tasks[] = $task; - } - - /** - * Adds a closure task to the list of tasks to run - * - * @param Closure $closure - * @param Closure|null $callback - * @return void - */ - public function queueClosure(Closure $closure, ?Closure $callback=null): void - { - $closure_task = new Task('tamer_closure', $closure, $callback); - $closure_task->setClosure(true); - $this->queue($closure_task); - } - - /** - * Executes all the tasks that has been added - * - * @return bool - */ - public function run(): bool - { - if(count($this->tasks) === 0) - return false; - - if(!$this->isConnected()) - return false; - - $this->preformAutoreconf(); - $correlationIds = []; - $connection = $this->connections[array_rand($this->connections)]; - - if($this->automatic_reconnect) - $connection->preformAutoreconf(); - - /** @var Task $task */ - foreach($this->tasks as $task) - { - $correlationIds[] = $task->getId(); - $job = new Job($task); - - $message = new AMQPMessage(msgpack_pack($job->toArray()), [ - 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, - 'correlation_id' => $task->getId(), - 'reply_to' => 'tamer_queue', - 'priority' => Functions::calculatePriority($task->getPriority()), - ]); - - $connection->getChannel()->basic_publish($message, '', 'tamer_queue'); - } - - // Register callback for each task - $callback = function($msg) use (&$correlationIds, $connection) - { - var_dump(Validate::getObjectType(msgpack_unpack($msg->body))); - if(Validate::getObjectType(msgpack_unpack($msg->body)) !== ObjectType::JobResults) - { - $connection->getChannel()->basic_nack($msg->delivery_info['delivery_tag'], false, true); - return; - } - - $job_result = JobResults::fromArray(msgpack_unpack($msg->body)); - $task = $this->getTaskById($job_result->getId()); - - if($task == null) - { - $connection->getChannel()->basic_nack($msg->delivery_info['delivery_tag'], false, true); - return; - } - - try - { - if($task->isClosure()) - { - $task->runCallback($job_result->getData()); - } - else - { - $task->runCallback($job_result); - } - } - catch(Exception $e) - { - echo $e->getMessage(); - } - - // Remove the processed correlation_id - $index = array_search($msg->get('correlation_id'), $correlationIds); - - if ($index !== false) - { - unset($correlationIds[$index]); - $connection->getChannel()->basic_ack($msg->delivery_info['delivery_tag']); - } - else - { - $connection->getChannel()->basic_nack($msg->delivery_info['delivery_tag'], false, true); - } - - // Stop consuming when all tasks are processed - if(count($correlationIds) === 0) - { - $connection->getChannel()->basic_cancel($msg->delivery_info['consumer_tag']); - } - }; - - $connection->getChannel()->basic_consume('tamer_queue', '', false, false, false, false, $callback); - - // Start consuming messages - while(count($connection->getChannel()->callbacks)) - { - $connection->getChannel()->wait(); - } - - return true; - } - - /** - * Returns a task by its id - * - * @param string $id - * @return Task|null - */ - private function getTaskById(string $id): ?Task - { - foreach($this->tasks as $task) - { - if($task->getId() === $id) - { - return $task; - } - } - - return null; - } - - /** - * The automatic reconnect process - * - * @return void - */ - private function preformAutoreconf(): void - { - if($this->automatic_reconnect) - { - foreach($this->connections as $connection) - { - $connection->preformAutoreconf(); - } - } - } - - /** - * Disconnects from the server when the object is destroyed - */ - public function __destruct() - { - try - { - $this->disconnect(); - } - catch(Exception $e) - { - unset($e); - } - } - - } \ No newline at end of file diff --git a/src/TamerLib/Protocols/RabbitMq/Connection.php b/src/TamerLib/Protocols/RabbitMq/Connection.php deleted file mode 100644 index 0ed425e..0000000 --- a/src/TamerLib/Protocols/RabbitMq/Connection.php +++ /dev/null @@ -1,216 +0,0 @@ -id = uniqid(); - $this->host = $host; - $this->port = $port; - $this->username = $username; - $this->password = $password; - } - - /** - * @return string - */ - public function getId(): string - { - return $this->id; - } - - /** - * @return AMQPStreamConnection|null - */ - public function getConnection(): ?AMQPStreamConnection - { - return $this->connection; - } - - /** - * @return AMQPChannel|null - */ - public function getChannel(): ?AMQPChannel - { - return $this->channel; - } - - /** - * Returns True if the client is connected to the server - * - * @return bool - */ - public function isConnected(): bool - { - return $this->connection !== null; - } - - /** - * Establishes a connection to the server - * - * @return void - * @throws ConnectionException - */ - public function connect(): void - { - if($this->isConnected()) - { - return; - } - - try - { - $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->username, $this->password); - $this->channel = $this->connection->channel(); - $this->channel->queue_declare('tamer_queue', false, true, false, false); - $this->next_reconnect = time() + 1800; - } - catch(Exception $e) - { - throw new ConnectionException(sprintf('Could not connect to RabbitMQ server: %s', $e->getMessage()), $e->getCode(), $e); - } - } - - /** - * Closes the connection to the server - * - * @return void - */ - public function disconnect(): void - { - if(!$this->isConnected()) - { - return; - } - - try - { - $this->channel?->close(); - } - catch(Exception $e) - { - unset($e); - } - - try - { - $this->connection?->close(); - } - catch(Exception $e) - { - unset($e); - } - - $this->channel = null; - $this->connection = null; - } - - /** - * Reconnects to the server - * - * @return void - * @throws ConnectionException - */ - public function reconnect(): void - { - $this->disconnect(); - $this->connect(); - } - - /** - * The automatic reconnect process - * - * @return void - */ - public function preformAutoreconf(): void - { - if($this->next_reconnect < time()) - { - try - { - $this->reconnect(); - } - catch (Exception $e) - { - Log::error('net.nosial.tamerlib', 'Could not reconnect to RabbitMQ server: %s', $e); - } - finally - { - $this->next_reconnect = time() + 1800; - } - } - } - - } \ No newline at end of file diff --git a/src/TamerLib/Protocols/RabbitMq/Worker.php b/src/TamerLib/Protocols/RabbitMq/Worker.php deleted file mode 100644 index 14e9da1..0000000 --- a/src/TamerLib/Protocols/RabbitMq/Worker.php +++ /dev/null @@ -1,399 +0,0 @@ -defined_servers = []; - $this->connections = []; - $this->functions = []; - $this->automatic_reconnect = true; - $this->username = $username; - $this->password = $password; - } - - /** - * Adds a server to the list of servers to use - * - * @param string $host - * @param int $port - * @return void - */ - public function addServer(string $host, int $port): void - { - if(!isset($this->defined_servers[$host])) - { - $this->defined_servers[$host] = []; - } - - if(in_array($port, $this->defined_servers[$host])) - { - return; - } - - $this->defined_servers[$host][] = $port; - } - - /** - * Adds an array of servers to the list of servers to use - * - * @param array $servers (eg; [host:port, host:port, ...]) - * @return void - */ - public function addServers(array $servers): void - { - foreach($servers as $server) - { - $server = explode(':', $server); - $this->addServer($server[0], (int)$server[1]); - } - } - - /** - * Establishes a connection to the server (or servers) - * - * @return void - * @noinspection DuplicatedCode - * @throws ConnectionException - */ - public function connect(): void - { - if($this->isConnected()) - return; - - if(count($this->defined_servers) === 0) - return; - - foreach($this->defined_servers as $host => $ports) - { - foreach($ports as $port) - { - $connection = new Connection($host, $port, $this->username, $this->password); - $connection->connect(); - - $this->connections[] = $connection; - } - } - } - - /** - * Disconnects from the server - * - * @return void - */ - public function disconnect(): void - { - if(!$this->isConnected()) - return; - - foreach($this->connections as $connection) - { - $connection->disconnect(); - } - - $this->connections = []; - } - - /** - * Reconnects to the server (or servers) - * - * @return void - * @throws ConnectionException - */ - public function reconnect(): void - { - $this->disconnect(); - $this->connect(); - } - - /** - * Returns True if one or more connections are connected, False otherwise - * (Note, some connections may be disconnected, and this will still return True) - * - * @return bool - */ - public function isConnected(): bool - { - if(count($this->connections) === 0) - return false; - - foreach($this->connections as $connection) - { - if($connection->isConnected()) - return true; - } - - return false; - } - - /** - * Sets the options to use for this client - * - * @param array $options - * @return void - */ - public function setOptions(array $options): void - { - $this->options = $options; - } - - /** - * Returns the current options for this client - * - * @return array - */ - public function getOptions(): array - { - return $this->options; - } - - /** - * Clears the current options for this client - * - * @return void - */ - public function clearOptions(): void - { - $this->options = []; - } - - /** - * Returns True if automatic reconnection is enabled, False otherwise - * - * @return bool - */ - public function automaticReconnectionEnabled(): bool - { - return $this->automatic_reconnect; - } - - /** - * Enables or disables automatic reconnection - * - * @param bool $enable - * @return void - */ - public function enableAutomaticReconnection(bool $enable): void - { - $this->automatic_reconnect = $enable; - } - - /** - * Registers a new function to the worker to handle - * - * @param string $name - * @param callable $callable - * @param mixed|null $context - * @return void - */ - public function addFunction(string $name, callable $callable, mixed $context = null): void - { - $this->functions[$name] = [ - 'function' => $callable, - 'context' => $context - ]; - } - - /** - * Removes an existing function from the worker - * - * @param string $function_name - * @return void - */ - public function removeFunction(string $function_name): void - { - unset($this->functions[$function_name]); - } - - /** - * Processes a job if there's one available - * - * @param bool $blocking - * @param int $timeout - * @param bool $throw_errors - * @return void - */ - public function work(bool $blocking = true, int $timeout = 500, bool $throw_errors = false): void - { - if(!$this->isConnected()) - return; - - // Select a random connection - $connection = $this->connections[array_rand($this->connections)]; - - $callback = function($message) use ($throw_errors, $connection) - { - var_dump(Validate::getObjectType(msgpack_unpack($message->body))); - if(Validate::getObjectType(msgpack_unpack($message->body)) !== ObjectType::Job) - { - $connection->getChannel()->basic_nack($message->delivery_info['delivery_tag']); - return; - } - - $received_job = Job::fromArray(msgpack_unpack($message->body)); - - if($received_job->isClosure()) - { - Log::debug('net.nosial.tamerlib', 'received closure: ' . $received_job->getId()); - - try - { - // TODO: Check back on this, looks weird. - $closure = $received_job->getData(); - $result = $closure($received_job); - } - catch(Exception $e) - { - unset($e); - - // Do not requeue the job, it's a closure - $connection->getChannel()->basic_nack($message->delivery_info['delivery_tag']); - return; - } - - $job_results = new JobResults($received_job, JobStatus::Success, $result); - $connection->getChannel->basic_publish( - new AMQPMessage(msgpack_pack($job_results->toArray()), ['correlation_id' => $received_job->getId()]) - ); - $connection->getChannel()->basic_ack($message->delivery_info['delivery_tag']); - return; - } - - if(!isset($this->functions[$received_job->getName()])) - { - Log::debug('net.nosial.tamerlib', 'received unknown function: ' . $received_job->getId()); - $connection->getChannel()->basic_nack($message->delivery_info['delivery_tag'], false, true); - return; - } - - Log::debug('net.nosial.tamerlib', 'received function: ' . $received_job->getId()); - $function = $this->functions[$received_job->getName()]; - $callback = $function['function']; - - try - { - $result = $callback($received_job->getData(), $function['context']); - } - catch(Exception $e) - { - unset($e); - - // Do not requeue the job, it's a closure - $connection->getChannel()->basic_nack($message->delivery_info['delivery_tag']); - return; - } - - $job_results = new JobResults($received_job, JobStatus::Success, $result); - $connection->getChannel->basic_publish( - new AMQPMessage(msgpack_pack($job_results->toArray()), ['correlation_id' => $received_job->getId()]) - ); - $connection->getChannel()->basic_ack($message->delivery_info['delivery_tag']); - }; - - $connection->getChannel()->basic_consume('tamer_queue', '', false, false, false, false, $callback); - - if ($blocking) - { - while(true) - { - $connection->getChannel()->wait(); - } - } - else - { - $start = microtime(true); - while (true) - { - if (microtime(true) - $start >= $timeout / 1000) - { - break; - } - - $connection->getChannel()->wait(); - } - } - } - - /** - * Disconnects from the server when the object is destroyed - */ - public function __destruct() - { - try - { - $this->disconnect(); - } - catch(Exception $e) - { - unset($e); - // Ignore - } - } - - } \ No newline at end of file diff --git a/src/TamerLib/Tamer.php b/src/TamerLib/Tamer.php index dbfd26b..11d82f3 100644 --- a/src/TamerLib/Tamer.php +++ b/src/TamerLib/Tamer.php @@ -4,444 +4,8 @@ namespace TamerLib; - use Closure; - use Exception; - use InvalidArgumentException; - use TamerLib\Abstracts\Mode; - use TamerLib\Classes\Functions; - use TamerLib\Classes\Supervisor; - use TamerLib\Classes\Validate; - use TamerLib\Exceptions\ConnectionException; - use TamerLib\Exceptions\UnsupervisedWorkerException; - use TamerLib\Interfaces\ClientProtocolInterface; - use TamerLib\Interfaces\WorkerProtocolInterface; - use TamerLib\Objects\Task; class Tamer { - /** - * The protocol to use when connecting to the server - * - * @var string - */ - private static $protocol; - /** - * The protocol to use when connecting to the server as a client - * - * @var ClientProtocolInterface|null - */ - private static $client; - - /** - * The protocol to use when connecting to the server as a worker - * - * @var WorkerProtocolInterface|null - */ - private static $worker; - - /** - * Indicates if Tamer is running as a client or worker - * - * @var string - * @see Mode - */ - private static $mode; - - /** - * Indicates if Tamer is connected to the server - * - * @var bool - */ - private static $connected; - - /** - * The supervisor that is supervising the workers - * - * @var Supervisor - */ - private static $supervisor; - - /** - * Initializes Tamer as a client and connects to the server - * - * @param string $protocol - * @param array $servers - * @param string|null $username - * @param string|null $password - * @return void - * @throws ConnectionException - */ - public static function init(string $protocol, array $servers, ?string $username=null, ?string $password=null): void - { - if(self::$connected) - { - throw new ConnectionException('Tamer is already connected to the server'); - } - - if (!Validate::protocolType($protocol)) - { - throw new InvalidArgumentException(sprintf('Invalid protocol type: %s', $protocol)); - } - - self::$protocol = $protocol; - self::$mode = Mode::Client; - self::$client = Functions::createClient($protocol, $username, $password); - self::$client->addServers($servers); - self::$client->connect(); - self::$supervisor = new Supervisor($protocol, $servers, $username, $password); - self::$connected = true; - } - - /** - * Initializes Tamer as a worker client and connects to the server - * - * @return void - * @throws ConnectionException - * @throws UnsupervisedWorkerException - */ - public static function initWorker(): void - { - if(self::$connected) - { - throw new ConnectionException('Tamer is already connected to the server'); - } - - if(!Functions::getWorkerVariables()['TAMER_ENABLED']) - { - throw new UnsupervisedWorkerException('Tamer is not enabled for this worker'); - } - - self::$protocol = Functions::getWorkerVariables()['TAMER_PROTOCOL']; - self::$mode = Mode::Worker; - self::$worker = Functions::createWorker(self::$protocol); - self::$worker->addServers(Functions::getWorkerVariables()['TAMER_SERVERS']); - self::$worker->connect(); - self::$connected = true; - } - - - /** - * Disconnects from the server - * - * @return void - * @throws ConnectionException - */ - public static function disconnect(): void - { - if (!self::$connected) - { - throw new ConnectionException('Tamer is not connected to the server'); - } - - if (self::$mode === Mode::Client) - { - self::$client->disconnect(); - } - else - { - self::$worker->disconnect(); - } - - self::$connected = false; - } - - /** - * Reconnects to the server - * - * @return void - * @throws ConnectionException - */ - public static function reconnect(): void - { - if (self::$mode === Mode::Client) - { - self::$client->reconnect(); - } - else - { - self::$worker->reconnect(); - } - } - - /** - * Adds a task to the queue to be executed by the worker - * - * @param Task $task - * @return void - */ - public static function do(Task $task): void - { - if (self::$mode === Mode::Client) - { - self::$client->do($task); - } - else - { - throw new InvalidArgumentException('Tamer is not running in client mode'); - } - } - - /** - * Executes a closure operation in the background (does not return a result) - * - * @param Closure $closure The closure operation to perform (remote) - * @return void - */ - public static function doClosure(Closure $closure): void - { - if (self::$mode === Mode::Client) - { - self::$client->doClosure($closure); - } - else - { - throw new InvalidArgumentException('Tamer is not running in client mode'); - } - } - - /** - * Queues a task to be processed in parallel (returns a result handled by a callback) - * - * @param Task $task - * @return void - */ - public static function queue(Task $task): void - { - if (self::$mode === Mode::Client) - { - self::$client->queue($task); - } - else - { - throw new InvalidArgumentException('Tamer is not running in client mode'); - } - } - - /** - * Queues a closure to be processed in parallel (returns a result handled by a callback) - * - * @param Closure $closure The closure operation to perform (remote) - * @param Closure|null $callback The closure to call when the operation is complete (local) - * @return void - */ - public static function queueClosure(Closure $closure, ?Closure $callback=null): void - { - if (self::$mode === Mode::Client) - { - self::$client->queueClosure($closure, $callback); - } - else - { - throw new InvalidArgumentException('Tamer is not running in client mode'); - } - } - - /** - * Executes all tasks in the queue and waits for them to complete - * - * @return bool - */ - public static function run(): bool - { - if (self::$mode === Mode::Client) - { - return self::$client->run(); - } - else - { - throw new InvalidArgumentException('Tamer is not running in client mode'); - } - } - - /** - * Registers a function to the worker - * - * @param string $name The name of the function to add - * @param callable $callable The function to add - * @return void - */ - public static function addFunction(string $name, callable $callable): void - { - if (self::$mode === Mode::Worker) - { - self::$worker->addFunction($name, $callable); - } - else - { - throw new InvalidArgumentException('Tamer is not running in worker mode'); - } - } - - /** - * Removes a function from the worker - * - * @param string $function_name The name of the function to remove - * @return void - */ - public static function removeFunction(string $function_name): void - { - if (self::$mode === Mode::Worker) - { - self::$worker->removeFunction($function_name); - } - else - { - throw new InvalidArgumentException('Tamer is not running in worker mode'); - } - } - - /** - * Works a job from the queue (blocking or non-blocking) - * - * @param bool $blocking (optional) Whether to block until a job is available - * @param int $timeout (optional) The timeout to use when blocking - * @param bool $throw_errors (optional) Whether to throw errors or not - * @return void - */ - public static function work(bool $blocking=true, int $timeout=500, bool $throw_errors=false): void - { - if (self::$mode === Mode::Worker) - { - self::$worker->work($blocking, $timeout, $throw_errors); - } - else - { - throw new InvalidArgumentException('Tamer is not running in worker mode'); - } - } - - /** - * Monitors the workers and restarts them if they die unexpectedly (monitor mode only) - * - * @param bool $blocking - * @param bool $auto_restart - * @return void - * @throws Exception - */ - public static function monitor(bool $blocking=false, bool $auto_restart=true): void - { - if (self::$mode === Mode::Client) - { - self::$supervisor->monitor($blocking, $auto_restart); - } - else - { - throw new InvalidArgumentException('Tamer is not running in client mode'); - } - } - - /** - * Adds a worker to the supervisor - * - * @param string $target - * @param int $instances - * @return void - * @throws Exception - */ - public static function addWorker(string $target, int $instances): void - { - if (self::$mode === Mode::Client) - { - self::$supervisor->addWorker($target, $instances); - } - else - { - throw new InvalidArgumentException('Tamer is not running in client mode'); - } - } - - /** - * Starts all workers - * - * @return void - * @throws Exception - */ - public static function startWorkers(): void - { - if (self::$mode === Mode::Client) - { - self::$supervisor->start(); - } - else - { - throw new InvalidArgumentException('Tamer is not running in client mode'); - } - } - - /** - * Stops all workers - * - * @return void - * @throws Exception - */ - public static function stopWorkers(): void - { - if (self::$mode === Mode::Client) - { - self::$supervisor->stop(); - } - else - { - throw new InvalidArgumentException('Tamer is not running in client mode'); - } - } - - /** - * Restarts all workers - * - * @return void - * @throws Exception - */ - public static function restartWorkers(): void - { - if (self::$mode === Mode::Client) - { - self::$supervisor->restart(); - } - else - { - throw new InvalidArgumentException('Tamer is not running in client mode'); - } - } - - /** - * @return string - */ - public static function getProtocol(): string - { - return self::$protocol; - } - - /** - * @return ClientProtocolInterface|null - */ - public static function getClient(): ?ClientProtocolInterface - { - return self::$client; - } - - /** - * @return WorkerProtocolInterface|null - */ - public static function getWorker(): ?WorkerProtocolInterface - { - return self::$worker; - } - - /** - * @return string - */ - public static function getMode(): string - { - return self::$mode; - } - - /** - * @return bool - */ - public static function isConnected(): bool - { - return self::$connected; - } } \ No newline at end of file diff --git a/tests/no_tamer.php b/tests/no_tamer.php deleted file mode 100644 index 5f120fc..0000000 --- a/tests/no_tamer.php +++ /dev/null @@ -1,30 +0,0 @@ -start(); + + $redis_client = new \Redis(); + $redis_client->connect('127.0.0.1', $redis_server->getPort()); + + $redis_client->set('foo', 'bar'); + $value = $redis_client->get('foo'); + + echo $value . PHP_EOL; + + $redis_client->close(); + $redis_server->stop(); \ No newline at end of file diff --git a/tests/tamer.php b/tests/tamer.php deleted file mode 100644 index 7388283..0000000 --- a/tests/tamer.php +++ /dev/null @@ -1,71 +0,0 @@ -getData()} seconds \n"; - })); - } - - echo "Waiting for jobs to finish \n"; - $a = microtime(true); - Tamer::run(); - $b = microtime(true); - echo "Took " . ($b - $a) . " seconds \n"; \ No newline at end of file diff --git a/tests/tamer_worker.php b/tests/tamer_worker.php deleted file mode 100644 index ddb5883..0000000 --- a/tests/tamer_worker.php +++ /dev/null @@ -1,18 +0,0 @@ -getData()); - return $job->getData(); - }); - - Tamer::work(); \ No newline at end of file