diff --git a/.idea/php.xml b/.idea/php.xml index 2c7445e..e01b5fd 100644 --- a/.idea/php.xml +++ b/.idea/php.xml @@ -9,6 +9,15 @@ + + + + + + + + + diff --git a/.idea/tamer.iml b/.idea/tamer.iml index c956989..4348106 100644 --- a/.idea/tamer.iml +++ b/.idea/tamer.iml @@ -1,7 +1,10 @@ - + + + + diff --git a/project.json b/project.json index f1c7607..e57e387 100644 --- a/project.json +++ b/project.json @@ -28,6 +28,20 @@ "build": { "source_path": "src", "default_configuration": "release", + "dependencies": [ + { + "name": "net.nosial.optslib", + "version": "latest", + "source_type": "remote", + "source": "nosial/libs.opts=latest@n64" + }, + { + "name": "net.nosial.loglib", + "version": "latest", + "source_type": "remote", + "source": "nosial/libs.log=latest@n64" + } + ], "configurations": [ { "name": "release", diff --git a/src/Tamer/Abstracts/ExitCodes/WorkerExitCodes.php b/src/Tamer/Abstracts/ExitCodes/WorkerExitCodes.php new file mode 100644 index 0000000..8e38fb1 --- /dev/null +++ b/src/Tamer/Abstracts/ExitCodes/WorkerExitCodes.php @@ -0,0 +1,16 @@ + true, + default => false, + }; + } + + /** + * Returns true if the input is a valid task priority. + * + * @param int $input + * @return bool + */ + public static function taskPriority(int $input): bool + { + return match ($input) + { + TaskPriority::Low, TaskPriority::Normal, TaskPriority::High => true, + default => false, + }; + } + } \ No newline at end of file diff --git a/src/Tamer/Client.php b/src/Tamer/Client.php new file mode 100644 index 0000000..c6c8268 --- /dev/null +++ b/src/Tamer/Client.php @@ -0,0 +1,43 @@ +setProtocol($protocol); + } + + /** + * @return string + */ + public function getProtocol(): string + { + return $this->protocol; + } + + /** + * @param string $protocol + */ + public function setProtocol(string $protocol): void + { + if(!Validate::protocolType($protocol)) + { + throw new InvalidArgumentException("Invalid protocol type: $protocol"); + } + + $this->protocol = $protocol; + } + } \ No newline at end of file diff --git a/src/Tamer/Exceptions/ServerException.php b/src/Tamer/Exceptions/ServerException.php new file mode 100644 index 0000000..93359a4 --- /dev/null +++ b/src/Tamer/Exceptions/ServerException.php @@ -0,0 +1,18 @@ +id = $id; + $this->name = $name; + $this->data = $data; + } + + /** + * @return string + */ + public function getId(): string + { + return $this->id; + } + + /** + * @return string + */ + public function getName(): string + { + return $this->name; + } + + /** + * @return string + */ + public function getData(): string + { + return $this->data; + } + } \ No newline at end of file diff --git a/src/Tamer/Objects/JobResults.php b/src/Tamer/Objects/JobResults.php new file mode 100644 index 0000000..09d8f59 --- /dev/null +++ b/src/Tamer/Objects/JobResults.php @@ -0,0 +1,59 @@ +task = $task; + $this->status = $status; + $this->result = $result; + } + + /** + * @return Task + */ + public function getTask(): Task + { + return $this->task; + } + + /** + * @return int + */ + public function getStatus(): int + { + return $this->status; + } + + /** + * @return string|null + */ + public function getResult(): ?string + { + return $this->result; + } + } \ No newline at end of file diff --git a/src/Tamer/Objects/Task.php b/src/Tamer/Objects/Task.php new file mode 100644 index 0000000..4cfcbf5 --- /dev/null +++ b/src/Tamer/Objects/Task.php @@ -0,0 +1,150 @@ +function_name = $function_name; + $this->data = $data; + $this->id = uniqid(); + $this->priority = TaskPriority::Normal; + $this->callback = $callback; + } + + /** + * Returns the function name for the task + * + * @return string + */ + public function getFunctionName(): string + { + return $this->function_name; + } + + /** + * Sets the function name for the task + * + * @param string $function_name + * @return Task + */ + public function setFunctionName(string $function_name): self + { + $this->function_name = $function_name; + return $this; + } + + /** + * Returns the arguments for the task + * + * @return string + */ + public function getData(): string + { + return $this->data; + } + + /** + * Sets the arguments for the task + * + * @param string $data + * @return Task + */ + public function setData(string $data): self + { + $this->data = $data; + return $this; + } + + /** + * Returns the Unique ID of the task + * + * @return string + */ + public function getId(): string + { + return $this->id; + } + + /** + * @return int + */ + public function getPriority(): int + { + return $this->priority; + } + + /** + * @param int $priority + * @return Task + */ + public function setPriority(int $priority): self + { + if(!Validate::taskPriority($priority)) + { + throw new InvalidArgumentException("Invalid priority value"); + } + + $this->priority = $priority; + return $this; + } + + /** + * @param callable|null $callback + */ + public function setCallback(?callable $callback): void + { + $this->callback = $callback; + } + + /** + * Executes the callback function + * + * @param JobResults $result + * @return void + */ + public function runCallback(JobResults $result): void + { + if($this->callback !== null) + { + call_user_func($this->callback, $result); + } + } + } \ No newline at end of file diff --git a/src/Tamer/Protocols/GearmanClient.php b/src/Tamer/Protocols/GearmanClient.php new file mode 100644 index 0000000..1abe10a --- /dev/null +++ b/src/Tamer/Protocols/GearmanClient.php @@ -0,0 +1,379 @@ +client = null; + $this->tasks = []; + $this->automatic_reconnect = false; + $this->next_reconnect = time() + 1800; + + try + { + $this->reconnect(); + } + catch(ServerException $e) + { + unset($e); + } + } + + /** + * Adds client options + * + * @link http://php.net/manual/en/gearmanclient.addoptions.php + * @param int[] $options (GEARMAN_CLIENT_NON_BLOCKING, GEARMAN_CLIENT_UNBUFFERED_RESULT, GEARMAN_CLIENT_FREE_TASKS) + * @return bool + */ + public function addOptions(array $options): bool + { + // Parse $options combination via bitwise OR operator + $options = array_reduce($options, function($carry, $item) + { + return $carry | $item; + }); + + return $this->client->addOptions($options); + } + + private function registerCallbacks(): void + { + $this->client->setCompleteCallback([$this, 'callbackHandler']); + $this->client->setFailCallback([$this, 'callbackHandler']); + $this->client->setDataCallback([$this, 'callbackHandler']); + $this->client->setStatusCallback([$this, 'callbackHandler']); + } + + + /** + * Adds a server to the list of servers to use + * + * @link http://php.net/manual/en/gearmanclient.addserver.php + * @param string $host (127.0.0.1) + * @param int $port (default: 4730) + * @return bool + * @throws ServerException + */ + public function addServer(string $host='127.0.0.1', int $port=4730): bool + { + if(!isset($this->server_cache[$host])) + { + $this->server_cache[$host] = []; + } + + if(in_array($port, $this->server_cache[$host])) + { + return true; + } + + $this->server_cache[$host][] = $port; + + try + { + return $this->client->addServer($host, $port); + } + catch(Exception $e) + { + throw new ServerException($e->getMessage(), $e->getCode(), $e); + } + } + + /** + * Adds a list of servers to the list of servers to use + * + * @link http://php.net/manual/en/gearmanclient.addservers.php + * @param array $servers (host:port, host:port, ...) + * @return bool + */ + public function addServers(array $servers): bool + { + return $this->client->addServers(implode(',', $servers)); + } + + /** + * Processes a task in the background + * + * @param Task $task + * @return bool + * @throws ServerException + */ + public function doBackground(Task $task): bool + { + if($this->automatic_reconnect && time() > $this->next_reconnect) + { + $this->reconnect(); + $this->next_reconnect = time() + 1800; + } + + $this->tasks[] = $task; + + switch($task->getPriority()) + { + case TaskPriority::High: + return $this->client->doHighBackground($task->getFunctionName(), $task->getData(), $task->getId()); + + case TaskPriority::Low: + return $this->client->doLowBackground($task->getFunctionName(), $task->getData(), $task->getId()); + + default: + case TaskPriority::Normal: + return $this->client->doBackground($task->getFunctionName(), $task->getData(), $task->getId()); + } + } + + /** + * Processes a task in the foreground + * + * @param Task $task + * @return JobResults + * @throws ServerException + */ + public function do(Task $task): JobResults + { + if($this->automatic_reconnect && time() > $this->next_reconnect) + { + $this->reconnect(); + $this->next_reconnect = time() + 1800; + } + + $this->tasks[] = $task; + + switch($task->getPriority()) + { + case TaskPriority::High: + return new JobResults($task, JobStatus::Success, $this->client->doHigh($task->getFunctionName(), $task->getData(), $task->getId())); + + case TaskPriority::Low: + return new JobResults($task, JobStatus::Success, $this->client->doLow($task->getFunctionName(), $task->getData(), $task->getId())); + + default: + case TaskPriority::Normal: + return new JobResults($task, JobStatus::Success, $this->client->doNormal($task->getFunctionName(), $task->getData(), $task->getId())); + } + } + + public function addTask(Task $task): ClientProtocolInterface + { + if($this->automatic_reconnect && time() > $this->next_reconnect) + { + $this->reconnect(); + $this->next_reconnect = time() + 1800; + } + + $this->tasks[] = $task; + + switch($task->getPriority()) + { + case TaskPriority::High: + $this->client->addTaskHigh($task->getFunctionName(), $task->getData(), $task->getId()); + break; + + case TaskPriority::Low: + $this->client->addTaskLow($task->getFunctionName(), $task->getData(), $task->getId()); + break; + + default: + case TaskPriority::Normal: + $this->client->addTask($task->getFunctionName(), $task->getData(), $task->getId()); + break; + } + + return $this; + } + + + public function addBackgroundTask(Task $task): ClientProtocolInterface + { + if($this->automatic_reconnect && time() > $this->next_reconnect) + { + $this->reconnect(); + $this->next_reconnect = time() + 1800; + } + + $this->tasks[] = $task; + + switch($task->getPriority()) + { + case TaskPriority::High: + $this->client->addTaskHighBackground($task->getFunctionName(), $task->getData(), $task->getId()); + break; + + case TaskPriority::Low: + $this->client->addTaskLowBackground($task->getFunctionName(), $task->getData(), $task->getId()); + break; + + default: + case TaskPriority::Normal: + $this->client->addTaskBackground($task->getFunctionName(), $task->getData(), $task->getId()); + break; + } + + return $this; + } + + /** + * @return bool + * @throws ServerException + */ + public function doTasks(): bool + { + if($this->automatic_reconnect && time() > $this->next_reconnect) + { + $this->reconnect(); + $this->next_reconnect = time() + 1800; + } + + if(!$this->client->runTasks()) + { + return false; + } + + return true; + } + + /** + * Processes a task callback in the foreground + * + * @param GearmanTask $task + * @return void + */ + public function callbackHandler(GearmanTask $task): void + { + $internal_task = $this->getTaskById($task->unique()); + $job_status = match ($task->returnCode()) + { + GEARMAN_WORK_EXCEPTION => JobStatus::Exception, + GEARMAN_WORK_FAIL => JobStatus::Failure, + default => JobStatus::Success, + }; + + $job_results = new JobResults($internal_task, $job_status, ($task->data() ?? null)); + + try + { + Log::debug('net.nosial.tamer', 'callback for task ' . $internal_task->getId() . ' with status ' . $job_status . ' and data size ' . strlen($task->data()) . ' bytes'); + $internal_task->runCallback($job_results); + } + catch(Exception $e) + { + Log::error('net.nosial.tamer', 'Callback for task ' . $internal_task->getId() . ' failed with error: ' . $e->getMessage(), $e); + } + finally + { + $this->removeTask($internal_task); + } + } + + /** + * @param string $id + * @return Task|null + */ + private function getTaskById(string $id): ?Task + { + var_dump($this->tasks); + var_dump($id); + foreach($this->tasks as $task) + { + if($task->getId() === $id) + { + return $task; + } + } + + return null; + } + + /** + * @throws ServerException + */ + private function reconnect() + { + $this->client = new \GearmanClient(); + + foreach($this->server_cache as $host => $ports) + { + foreach($ports as $port) + { + $this->addServer($host, $port); + } + } + + $this->registerCallbacks(); + } + + /** + * Removes a task from the list of tasks + * + * @param Task $task + * @return ClientProtocolInterface + */ + private function removeTask(Task $task): ClientProtocolInterface + { + $this->tasks = array_filter($this->tasks, function($item) use ($task) + { + return $item->getId() !== $task->getId(); + }); + + return $this; + } + + /** + * @return bool + */ + public function isAutomaticReconnect(): bool + { + return $this->automatic_reconnect; + } + + /** + * @param bool $automatic_reconnect + */ + public function setAutomaticReconnect(bool $automatic_reconnect): void + { + $this->automatic_reconnect = $automatic_reconnect; + } + } \ No newline at end of file diff --git a/src/Tamer/Protocols/GearmanWorker.php b/src/Tamer/Protocols/GearmanWorker.php new file mode 100644 index 0000000..2e3d359 --- /dev/null +++ b/src/Tamer/Protocols/GearmanWorker.php @@ -0,0 +1,219 @@ +worker = null; + $this->server_cache = []; + $this->automatic_reconnect = false; + $this->next_reconnect = time() + 1800; + $this->current_job = null; + + try + { + $this->reconnect(); + } + catch(Exception $e) + { + unset($e); + } + } + + /** + * Adds a server to the list of servers to use + * + * @link http://php.net/manual/en/gearmanworker.addserver.php + * @param string $host ( + * @param int $port (default: 4730) + * @return bool + * @throws ServerException + */ + public function addServer(string $host='127.0.0.1', int $port=4730): bool + { + if(!isset($this->server_cache[$host])) + { + $this->server_cache[$host] = []; + } + + if(in_array($port, $this->server_cache[$host])) + { + return true; + } + + $this->server_cache[$host][] = $port; + + try + { + return $this->worker->addServer($host, $port); + } + catch(Exception $e) + { + throw new ServerException($e->getMessage(), $e->getCode(), $e); + } + } + + /** + * Adds a list of servers to the list of servers to use + * + * @link http://php.net/manual/en/gearmanworker.addservers.php + * @param string[] $servers (host:port, host:port, ...) + * @return WorkerProtocolInterface + * @throws ServerException + */ + public function addServers(array $servers): self + { + foreach($servers as $server) + { + $server = explode(':', $server); + $this->addServer($server[0], $server[1]); + } + + return $this; + } + + + /** + * Adds a function to the list of functions to call + * + * @link http://php.net/manual/en/gearmanworker.addfunction.php + * @param string $function_name The name of the function to register with the job server + * @param callable $function The callback function to call when the job is received + * @param mixed|null $context (optional) The context to pass to the callback function + * @return WorkerProtocolInterface + */ + public function addFunction(string $function_name, callable $function, mixed $context=null): self + { + $this->worker->addFunction($function_name, function(GearmanJob $job) use ($function, $context) + { + $this->current_job = $job; + $job = new Job($job->unique(), $job->handle(), $job->workload()); + $function($job, $context); + $this->current_job = null; + }); + return $this; + } + + /** + * Removes a function from the list of functions to call + * + * @param string $function_name The name of the function to unregister + * @return WorkerProtocolInterface + */ + public function removeFunction(string $function_name): self + { + $this->worker->unregister($function_name); + return $this; + } + + /** + * @return bool + */ + public function isAutomaticReconnect(): bool + { + return $this->automatic_reconnect; + } + + /** + * @param bool $automatic_reconnect + * @return WorkerProtocolInterface + */ + public function setAutomaticReconnect(bool $automatic_reconnect): self + { + $this->automatic_reconnect = $automatic_reconnect; + return $this; + } + + /** + * @throws ServerException + */ + private function reconnect() + { + $this->worker = new \GearmanWorker(); + $this->worker->addOptions(GEARMAN_WORKER_GRAB_UNIQ); + + foreach($this->server_cache as $host => $ports) + { + foreach($ports as $port) + { + $this->addServer($host, $port); + } + } + } + + /** + * Waits for a job and calls the appropriate callback function + * + * @link http://php.net/manual/en/gearmanworker.work.php + * @param bool $blocking (default: true) Whether to block until a job is received + * @param int $timeout (default: 500) The timeout in milliseconds + * @param bool $throw_errors (default: false) Whether to throw exceptions on errors + * @return void Returns nothing + * @throws ServerException If the worker cannot connect to the server + * @throws WorkerException If the worker encounters an error while working if $throw_errors is true + */ + public function work(bool $blocking=true, int $timeout=500, bool $throw_errors=false): void + { + if($this->automatic_reconnect && (time() > $this->next_reconnect)) + { + $this->reconnect(); + $this->next_reconnect = time() + 1800; + } + + $this->worker->setTimeout($timeout); + + while(true) + { + @$this->worker->work(); + + if($this->worker->returnCode() == GEARMAN_COULD_NOT_CONNECT) + { + throw new ServerException('Could not connect to Gearman server'); + } + + if($this->worker->returnCode() == GEARMAN_TIMEOUT && !$blocking) + { + break; + } + + if($this->worker->returnCode() != GEARMAN_SUCCESS && $throw_errors) + { + throw new WorkerException('Gearman worker error: ' . $this->worker->error(), $this->worker->returnCode()); + } + } + } + } \ No newline at end of file diff --git a/src/Tamer/Protocols/RabbitMqClient.php b/src/Tamer/Protocols/RabbitMqClient.php new file mode 100644 index 0000000..4072ed0 --- /dev/null +++ b/src/Tamer/Protocols/RabbitMqClient.php @@ -0,0 +1,8 @@ +addServer(); + + $client->addTask(new Task('sleep', '5', function(Job $job) { + echo "Task {$job->getId()} completed with data: {$job->getData()} \n"; + })); + + + $client->doTasks(); \ No newline at end of file diff --git a/tests/gearman_worker.php b/tests/gearman_worker.php new file mode 100644 index 0000000..7783f89 --- /dev/null +++ b/tests/gearman_worker.php @@ -0,0 +1,23 @@ +addServer(); + + $worker->addFunction('sleep', function($task) { + var_dump(get_class($task)); + echo "Task {$task->getId()} started with data: {$task->getData()} \n"; + sleep($task->getData()); + echo "Task {$task->getId()} completed with data: {$task->getData()} \n"; + }); + + + + while(true) + { + echo "Waiting for job... \n"; + $worker->work(); + } \ No newline at end of file