From e88e18995e042722d5caf3da887675a22a24f779 Mon Sep 17 00:00:00 2001 From: netkas Date: Mon, 4 Nov 2024 00:56:21 -0500 Subject: [PATCH] Add forking capability for update processing --- src/TgBotLib/PollingBot.php | 122 +++++++++++++++++++++++++++++++----- 1 file changed, 108 insertions(+), 14 deletions(-) diff --git a/src/TgBotLib/PollingBot.php b/src/TgBotLib/PollingBot.php index afbb782..d883ed1 100644 --- a/src/TgBotLib/PollingBot.php +++ b/src/TgBotLib/PollingBot.php @@ -2,8 +2,11 @@ namespace TgBotLib; + use RuntimeException; use TgBotLib\Abstracts\UpdateEvent; + use TgBotLib\Classes\Logger; use TgBotLib\Enums\EventType; + use Throwable; /** * PollingBot class that extends Bot for handling updates using polling. @@ -14,6 +17,7 @@ private int $limit; private int $timeout; private array $allowedUpdates; + private bool $fork; /** * Constructor for the class, initializing with a Bot instance. @@ -28,8 +32,15 @@ $this->limit = 100; $this->timeout = 0; $this->allowedUpdates = []; + $this->fork = false; } + /** + * Sets the offset value. + * + * @param int $offset The offset value to be set. + * @return void + */ public function setOffset(int $offset): void { $this->offset = $offset; @@ -120,6 +131,27 @@ $this->allowedUpdates[] = $allowedUpdate; } + /** + * Sets whether updates should be processed in a forked process + * + * @param bool $fork + * @return void + */ + public function setFork(bool $fork): void + { + $this->fork = $fork; + } + + /** + * Gets the current fork setting + * + * @return bool + */ + public function getFork(): bool + { + return $this->fork; + } + /** * Handles incoming updates by fetching and processing them with appropriate event handlers. * @@ -127,32 +159,94 @@ */ public function handleUpdates(): void { - foreach($this->getUpdates(offset: ($this->offset ?: 0), limit: $this->limit, timeout: $this->timeout, allowed_updates: $this->retrieveAllowedUpdates()) as $update) - { - // Update the last offset if the current Update ID is greater than the offset ID - if($update->getUpdateId() > $this->offset) - { - $this->offset = $update->getUpdateId() + 1; - } + $updates = $this->getUpdates(offset: ($this->offset ?: 0), limit: $this->limit, timeout: $this->timeout, allowed_updates: $this->retrieveAllowedUpdates()); + if (empty($updates)) + { + return; + } + + // Update the offset based on the last update ID + $lastUpdate = end($updates); + if ($lastUpdate->getUpdateId() > ($this->offset ?? 0)) + { + $this->offset = $lastUpdate->getUpdateId() + 1; + } + + if ($this->fork) + { + $this->handleUpdatesInFork($updates); + } + else + { + $this->processUpdates($updates); + } + } + + /** + * Handles updates in a forked child process + * + * @param array $updates + * @return void + */ + private function handleUpdatesInFork(array $updates): void + { + $pid = pcntl_fork(); + + if ($pid == -1) + { + // Fork failed + throw new RuntimeException('Failed to fork process for update handling'); + } + elseif ($pid) + { + // Parent process + // Wait for child to finish to prevent zombie processes + pcntl_wait($status); + } + else + { + // Child process + try + { + $this->processUpdates($updates); + exit(0); + } + catch (Throwable $e) + { + Logger::getLogger()->error("Error in forked process: " . $e->getMessage()); + exit(1); + } + } + } + + /** + * Processes a batch of updates with appropriate event handlers + * + * @param array $updates + * @return void + */ + private function processUpdates(array $updates): void + { + foreach ($updates as $update) + { $updateByType = $this->getEventHandlersByType(EventType::determineEventType($update)); - if(count($updateByType) === 0) + if (count($updateByType) === 0) { // If no event handlers are found appropriate for the update type, use the generic update event handler - // So that we don't miss any updates - /** @var UpdateEvent $eventHandler */ - foreach($this->getEventHandlersByType(EventType::UPDATE_EVENT) as $eventHandler) + foreach ($this->getEventHandlersByType(EventType::UPDATE_EVENT) as $eventHandler) { + /** @var UpdateEvent $eventHandler */ (new $eventHandler($update))->handle($this); } } else { // Otherwise, use the appropriate event handler for the update type - /** @var UpdateEvent $eventHandler */ - foreach($this->getEventHandlersByType(EventType::determineEventType($update)) as $eventHandler) + foreach ($updateByType as $eventHandler) { + /** @var UpdateEvent $eventHandler */ (new $eventHandler($update))->handle($this); } } @@ -166,7 +260,7 @@ */ private function retrieveAllowedUpdates(): ?array { - if(count($this->allowedUpdates) === 0) + if (count($this->allowedUpdates) === 0) { return null; }